stats refactored

This commit is contained in:
2022-09-19 16:50:43 +03:00
parent dffdff2869
commit 4536370c79
24 changed files with 355 additions and 410 deletions

View File

@@ -1,29 +1,7 @@
import asyncio
from datetime import datetime
from enum import Enum as Enumeration
from sqlalchemy import Column, DateTime, ForeignKey, Boolean
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.types import Enum as ColumnEnum
from base.orm import Base, local_session
from orm.topic import ShoutTopic
class ReactionKind(Enumeration):
AGREE = 1 # +1
DISAGREE = 2 # -1
PROOF = 3 # +1
DISPROOF = 4 # -1
ASK = 5 # +0 bookmark
PROPOSE = 6 # +0
QUOTE = 7 # +0 bookmark
COMMENT = 8 # +0
ACCEPT = 9 # +1
REJECT = 0 # -1
LIKE = 11 # +1
DISLIKE = 12 # -1
# TYPE = <reaction index> # rating diff
from base.orm import local_session
from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage
def kind_to_rate(kind) -> int:
@@ -45,18 +23,6 @@ def kind_to_rate(kind) -> int:
return 0
class ReactedByDay(Base):
__tablename__ = "reacted_by_day"
id = None # type: ignore
reaction = Column(ForeignKey("reaction.id"), primary_key=True)
shout = Column(ForeignKey("shout.slug"), primary_key=True)
replyTo = Column(ForeignKey("reaction.id"), nullable=True)
kind = Column(ColumnEnum(ReactionKind), nullable=False, comment="Reaction kind")
day = Column(DateTime, primary_key=True, default=datetime.now)
comment = Column(Boolean, default=False)
class ReactedStorage:
reacted = {"shouts": {}, "topics": {}, "reactions": {}}
rating = {"shouts": {}, "topics": {}, "reactions": {}}
@@ -64,6 +30,7 @@ class ReactedStorage:
to_flush = []
period = 30 * 60 # sec
lock = asyncio.Lock()
modified_shouts = set([])
@staticmethod
async def get_shout(shout_slug):
@@ -82,7 +49,7 @@ class ReactedStorage:
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: r.comment, self.reacted["shouts"].get(shout_slug, {}))
filter(lambda r: bool(r.body), self.reacted["shouts"].get(shout_slug, {}))
)
@staticmethod
@@ -90,7 +57,7 @@ class ReactedStorage:
self = ReactedStorage
async with self.lock:
return list(
filter(lambda r: r.comment, self.reacted["topics"].get(topic_slug, []))
filter(lambda r: bool(r.body), self.reacted["topics"].get(topic_slug, []))
)
@staticmethod
@@ -99,7 +66,7 @@ class ReactedStorage:
async with self.lock:
return list(
filter(
lambda r: r.comment, self.reacted["reactions"].get(reaction_id, {})
lambda r: bool(r.body), self.reacted["reactions"].get(reaction_id, {})
)
)
@@ -138,118 +105,62 @@ class ReactedStorage:
@staticmethod
async def react(reaction):
ReactedStorage.modified_shouts.add(reaction.shout)
@staticmethod
async def recount(reactions):
self = ReactedStorage
async with self.lock:
reactions = {}
# iterate sibling reactions
reactions = self.reacted["shouts"].get(reaction.shout, {})
for r in reactions.values():
reaction = ReactedByDay.create({
"day": datetime.now().replace(
hour=0, minute=0, second=0, microsecond=0
),
"reaction": r.id,
"kind": r.kind,
"shout": r.shout,
"comment": bool(r.body),
"replyTo": r.replyTo
})
# renew sorted by shouts store
self.reacted["shouts"][reaction.shout] = self.reacted["shouts"].get(reaction.shout, [])
self.reacted["shouts"][reaction.shout].append(reaction)
if reaction.replyTo:
self.reacted["reaction"][reaction.replyTo] = self.reacted[
"reactions"
].get(reaction.shout, [])
self.reacted["reaction"][reaction.replyTo].append(reaction)
self.rating["reactions"][reaction.replyTo] = self.rating[
"reactions"
].get(reaction.replyTo, 0) + kind_to_rate(reaction.kind)
else:
# rate only by root reactions on shout
self.rating["shouts"][reaction.replyTo] = self.rating["shouts"].get(
reaction.shout, 0
) + kind_to_rate(reaction.kind)
flag_modified(reaction, "value")
for r in reactions:
# renew shout counters
self.reacted["shouts"][r.shout] = self.reacted["shouts"].get(r.shout, [])
self.reacted["shouts"][r.shout].append(r)
# renew topics counters
shout_topics = await TopicStorage.get_topics_by_slugs([r.shout, ])
for t in shout_topics:
self.reacted["topics"][t] = self.reacted["topics"].get(t, [])
self.reacted["topics"][t].append(r)
self.rating["topics"][t] = \
self.rating["topics"].get(t, 0) + kind_to_rate(r.kind)
if r.replyTo:
# renew reaction counters
self.reacted["reactions"][r.replyTo] = \
self.reacted["reactions"].get(r.replyTo, [])
self.reacted["reactions"][r.replyTo].append(r)
self.rating["reactions"][r.replyTo] = \
self.rating["reactions"].get(r.replyTo, 0) + kind_to_rate(r.kind)
else:
# renew shout rating
self.rating["shouts"][r.shout] = \
self.rating["shouts"].get(r.shout, 0) + kind_to_rate(r.kind)
@staticmethod
def init(session):
self = ReactedStorage
all_reactions = session.query(ReactedByDay).all()
print("[stat.reacted] %d reactions total" % len(all_reactions))
for reaction in all_reactions:
shout = reaction.shout
topics = (
session.query(ShoutTopic.topic).where(ShoutTopic.shout == shout).all()
)
kind = reaction.kind
self.reacted["shouts"][shout] = self.reacted["shouts"].get(shout, [])
self.reacted["shouts"][shout].append(reaction)
self.rating["shouts"][shout] = self.rating["shouts"].get(
shout, 0
) + kind_to_rate(kind)
for t in topics:
self.reacted["topics"][t] = self.reacted["topics"].get(t, [])
self.reacted["topics"][t].append(reaction)
self.rating["topics"][t] = self.rating["topics"].get(
t, 0
) + kind_to_rate(
kind
) # rating
if reaction.replyTo:
self.reacted["reactions"][reaction.replyTo] = self.reacted[
"reactions"
].get(reaction.replyTo, [])
self.reacted["reactions"][reaction.replyTo].append(reaction)
self.rating["reactions"][reaction.replyTo] = self.rating[
"reactions"
].get(reaction.replyTo, 0) + kind_to_rate(reaction.kind)
ttt = self.reacted["topics"].values()
print("[stat.reacted] %d topics reacted" % len(ttt))
print("[stat.reacted] %d shouts reacted" % len(self.reacted["shouts"]))
print("[stat.reacted] %d reactions reacted" % len(self.reacted["reactions"]))
all_reactions = session.query(Reaction).all()
self.modified_shouts = set([r.shout for r in all_reactions])
print("[stat.reacted] %d shouts with reactions updates" % len(self.modified_shouts))
@staticmethod
async def flush_changes(session):
async def recount_changed(session):
self = ReactedStorage
async with self.lock:
for slug in dict(self.reacted["shouts"]).keys():
topics = (
session.query(ShoutTopic.topic)
.where(ShoutTopic.shout == slug)
.all()
)
reactions = self.reacted["shouts"].get(slug, [])
# print('[stat.reacted] shout {' + str(slug) + "}: " + str(len(reactions)))
for ts in list(topics):
tslug = ts[0]
topic_reactions = self.reacted["topics"].get(tslug, [])
topic_reactions += reactions
# print('[stat.reacted] topic {' + str(tslug) + "}: " + str(len(topic_reactions)))
reactions += list(self.reacted["reactions"].values())
for reaction in reactions:
if getattr(reaction, "modified", False):
session.add(reaction)
flag_modified(reaction, "value")
reaction.modified = False
# print('flushing')
for reaction in self.to_flush:
session.add(reaction)
self.to_flush.clear()
session.commit()
print('[stat.reacted] recounting...')
for slug in list(self.modified_shouts):
siblings = session.query(Reaction).where(Reaction.shout == slug).all()
await self.recount(siblings)
print("[stat.reacted] %d shouts with reactions updates" % len(self.modified_shouts))
print("[stat.reacted] %d topics reacted" % len(self.reacted["topics"].values()))
print("[stat.reacted] %d shouts reacted" % len(self.reacted["shouts"]))
print("[stat.reacted] %d reactions reacted" % len(self.reacted["reactions"]))
self.modified_shouts = set([])
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ReactedStorage().flush_changes(session)
print("[stat.reacted] periodical flush")
await ReactedStorage.recount_changed(session)
except Exception as err:
print("[stat.reacted] errror: %s" % (err))
print("[stat.reacted] recount error %s" % (err))
await asyncio.sleep(ReactedStorage.period)

View File

@@ -1,17 +1,15 @@
import asyncio
from base.orm import local_session
from orm.shout import Shout
from orm.topic import ShoutTopic, TopicFollower
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
from orm.shout import Shout, ShoutTopic
from orm.topic import TopicFollower
from services.zine.shoutauthor import ShoutAuthorStorage
class TopicStat:
shouts_by_topic = {}
authors_by_topic = {}
followers_by_topic = {}
shouts_by_topic = {} # Shout object stored
authors_by_topic = {} # User
followers_by_topic = {} # User
lock = asyncio.Lock()
period = 30 * 60 # sec
@@ -19,38 +17,33 @@ class TopicStat:
async def load_stat(session):
self = TopicStat
shout_topics = session.query(ShoutTopic).all()
print("[stat.topics] shout topics amount", len(shout_topics))
print("[stat.topics] shouts linked %d times" % len(shout_topics))
for shout_topic in shout_topics:
tpc = shout_topic.topic
# shouts by topics
topic = shout_topic.topic
shout = shout_topic.shout
sss = set(self.shouts_by_topic.get(topic, []))
shout = session.query(Shout).where(Shout.slug == shout).first()
sss.union(
[
shout,
]
)
self.shouts_by_topic[topic] = list(sss)
shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, [])
if shout not in self.shouts_by_topic[tpc]:
self.shouts_by_topic[tpc].append(shout)
# authors by topics
authors = await ShoutAuthorStorage.get_authors(shout)
aaa = set(self.authors_by_topic.get(topic, []))
aaa.union(authors)
self.authors_by_topic[topic] = list(aaa)
authors = await ShoutAuthorStorage.get_authors(shout.slug)
self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, [])
for a in authors:
if a not in self.authors_by_topic[tpc]:
self.authors_by_topic[tpc].append(a)
print("[stat.topics] authors sorted")
print("[stat.topics] shouts sorted")
print("[stat.topics] shouts indexed by %d topics" % len(self.shouts_by_topic.keys()))
print("[stat.topics] authors indexed by %d topics" % len(self.authors_by_topic.keys()))
self.followers_by_topic = {}
followings = session.query(TopicFollower)
followings = session.query(TopicFollower).all()
for flw in followings:
topic = flw.topic
user = flw.follower
if topic not in self.followers_by_topic:
self.followers_by_topic[topic] = []
self.followers_by_topic[topic].append(user)
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, [])
if user not in self.followers_by_topic[topic]:
self.followers_by_topic[topic].append(user)
print("[stat.topics] followers sorted")
@staticmethod
@@ -59,23 +52,6 @@ class TopicStat:
async with self.lock:
return self.shouts_by_topic.get(topic, [])
@staticmethod
async def get_stat(topic):
self = TopicStat
async with self.lock:
shouts = self.shouts_by_topic.get(topic, [])
followers = self.followers_by_topic.get(topic, [])
authors = self.authors_by_topic.get(topic, [])
return {
"shouts": len(shouts),
"authors": len(authors),
"followers": len(followers),
"viewed": await ViewedStorage.get_topic(topic),
"reacted": len(await ReactedStorage.get_topic(topic)),
"commented": len(await ReactedStorage.get_topic_comments(topic)),
"rating": await ReactedStorage.get_topic_rating(topic),
}
@staticmethod
async def worker():
self = TopicStat
@@ -84,7 +60,6 @@ class TopicStat:
with local_session() as session:
async with self.lock:
await self.load_stat(session)
print("[stat.topics] periodical update")
except Exception as err:
print("[stat.topics] errror: %s" % (err))
raise Exception(err)
await asyncio.sleep(self.period)

View File

@@ -1,20 +1,12 @@
import asyncio
from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from base.orm import local_session
from sqlalchemy.orm.attributes import flag_modified
from base.orm import Base, local_session
from orm.topic import ShoutTopic
class ViewedByDay(Base):
__tablename__ = "viewed_by_day"
id = None
shout = Column(ForeignKey("shout.slug"), primary_key=True)
day = Column(DateTime, primary_key=True, default=datetime.now)
value = Column(Integer)
from orm.shout import ShoutTopic
from orm.viewed import ViewedByDay
class ViewedStorage:
@@ -47,7 +39,7 @@ class ViewedStorage:
if this_day_view.day < view.day:
self.this_day_views[shout] = view
print("[stat.viewed] %d shouts viewed" % len(views))
print("[stat.viewed] %d shouts viewed" % len(self.viewed['shouts']))
@staticmethod
async def get_shout(shout_slug):
@@ -68,7 +60,7 @@ class ViewedStorage:
return self.viewed["reactions"].get(reaction_id, 0)
@staticmethod
async def increment(shout_slug):
async def increment(shout_slug, amount=1):
self = ViewedStorage
async with self.lock:
this_day_view = self.this_day_views.get(shout_slug)
@@ -79,11 +71,9 @@ class ViewedStorage:
this_day_view = ViewedByDay.create(shout=shout_slug, value=1)
self.this_day_views[shout_slug] = this_day_view
else:
this_day_view.value = this_day_view.value + 1
this_day_view.value = this_day_view.value + amount
this_day_view.modified = True
self.viewed["shouts"][shout_slug] = (
self.viewed["shouts"].get(shout_slug, 0) + 1
)
self.viewed["shouts"][shout_slug] = (self.viewed["shouts"].get(shout_slug, 0) + amount)
with local_session() as session:
topics = (
session.query(ShoutTopic.topic)
@@ -91,7 +81,7 @@ class ViewedStorage:
.all()
)
for t in topics:
self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + 1
self.viewed["topics"][t] = self.viewed["topics"].get(t, 0) + amount
flag_modified(this_day_view, "value")
@staticmethod