ShoutViewStorage refactor

This commit is contained in:
knst-kotov 2021-09-29 15:59:48 +03:00
parent 6e451c8095
commit c32fc43538
4 changed files with 59 additions and 38 deletions

View File

@ -13,7 +13,9 @@ from auth.oauth import oauth_login, oauth_authorize
from auth.email import email_authorize from auth.email import email_authorize
from redis import redis from redis import redis
from resolvers.base import resolvers from resolvers.base import resolvers
from resolvers.zine import GitTask, TopShouts, db_flush_worker from resolvers.zine import GitTask, TopShouts
from orm.shout import ShoutViewStorage
import asyncio import asyncio
@ -29,7 +31,7 @@ async def start_up():
await redis.connect() await redis.connect()
git_task = asyncio.create_task(GitTask.git_task_worker()) git_task = asyncio.create_task(GitTask.git_task_worker())
top_shouts_task = asyncio.create_task(TopShouts.worker()) top_shouts_task = asyncio.create_task(TopShouts.worker())
db_flush_task = asyncio.create_task(db_flush_worker()) view_storage_task = asyncio.create_task(ShoutViewStorage.worker())
async def shutdown(): async def shutdown():
await redis.disconnect() await redis.disconnect()

View File

@ -17,4 +17,4 @@ Resource.init_table()
with local_session() as session: with local_session() as session:
rating_storage = ShoutRatingStorage(session) rating_storage = ShoutRatingStorage(session)
view_storage = ShoutViewStorage(session) ShoutViewStorage.init(session)

View File

@ -1,14 +1,16 @@
from typing import List from typing import List
from datetime import datetime from datetime import datetime, timedelta
from sqlalchemy import Table, Column, Integer, String, ForeignKey, DateTime, Boolean from sqlalchemy import Table, Column, Integer, String, ForeignKey, DateTime, Boolean
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
from orm import Permission, User, Topic from orm import Permission, User, Topic
from orm.comment import Comment from orm.comment import Comment
from orm.base import Base from orm.base import Base, local_session
from functools import reduce from functools import reduce
import asyncio
class ShoutAuthor(Base): class ShoutAuthor(Base):
__tablename__ = "shout_author" __tablename__ = "shout_author"
@ -68,9 +70,17 @@ class ShoutViewByDay(Base):
class ShoutViewStorage: class ShoutViewStorage:
def __init__(self, session): views = []
this_day_views = {}
period = 30*60 #sec
lock = asyncio.Lock()
@staticmethod
def init(session):
self = ShoutViewStorage
self.views = session.query(ShoutViewByDay).all() self.views = session.query(ShoutViewByDay).all()
self.this_day_views = {}
for view in self.views: for view in self.views:
shout_id = view.shout_id shout_id = view.shout_id
if not shout_id in self.this_day_views: if not shout_id in self.this_day_views:
@ -79,28 +89,48 @@ class ShoutViewStorage:
if this_day_view.day < view.day: if this_day_view.day < view.day:
self.this_day_views[shout_id] = view self.this_day_views[shout_id] = view
def get_view(self, shout_id): @staticmethod
shout_views = list(filter(lambda x: x.shout_id == shout_id, self.views)) async def get_view(shout_id):
async with ShoutViewStorage.lock:
shout_views = list(filter(lambda x: x.shout_id == shout_id, ShoutViewStorage.views))
return reduce((lambda x, y: x + y.value), shout_views, 0) return reduce((lambda x, y: x + y.value), shout_views, 0)
def inc_view(self, shout_id): @staticmethod
this_day_view = self.this_day_views.get(shout_id) async def inc_view(shout_id):
if not this_day_view: self = ShoutViewStorage
this_day_view = ShoutViewByDay.create(shout_id = shout_id, value = 1) async with ShoutViewStorage.lock:
self.this_day_views[shout_id] = this_day_view this_day_view = self.this_day_views.get(shout_id)
self.views.append(this_day_view) day_start = datetime.now().replace(hour = 0, minute = 0, second = 0)
else: if not this_day_view or this_day_view.day < day_start:
this_day_view.value = this_day_view.value + 1 this_day_view = ShoutViewByDay.create(shout_id = shout_id, value = 1)
this_day_view.modified = True self.this_day_views[shout_id] = this_day_view
self.views.append(this_day_view)
else:
this_day_view.value = this_day_view.value + 1
this_day_view.modified = True
def flush_changes(self, session): @staticmethod
for view in self.this_day_views.values(): async def flush_changes(session):
if getattr(view, "modified", False): async with ShoutViewStorage.lock:
session.add(view) for view in ShoutViewStorage.this_day_views.values():
flag_modified(view, "value") if getattr(view, "modified", False):
view.modified = False session.add(view)
flag_modified(view, "value")
view.modified = False
session.commit() session.commit()
@staticmethod
async def worker():
print("ShoutViewStorage worker start")
while True:
try:
print("ShoutViewStorage worker: flush changes")
with local_session() as session:
await ShoutViewStorage.flush_changes(session)
except Exception as err:
print("ShoutViewStorage worker: error = %s" % (err))
await asyncio.sleep(ShoutViewStorage.period)
class Shout(Base): class Shout(Base):
__tablename__ = 'shout' __tablename__ = 'shout'

View File

@ -1,5 +1,5 @@
from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\ from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\
rating_storage, view_storage rating_storage, ShoutViewStorage
from orm.base import local_session from orm.base import local_session
from resolvers.base import mutation, query from resolvers.base import mutation, query
@ -165,17 +165,6 @@ class TopShouts:
print("top shouts worker error = %s" % (err)) print("top shouts worker error = %s" % (err))
await asyncio.sleep(TopShouts.period) await asyncio.sleep(TopShouts.period)
async def db_flush_worker():
print("db flush worker start")
while True:
try:
print("flush changes")
with local_session() as session:
view_storage.flush_changes(session)
except Exception as err:
print("db flush worker error = %s" % (err))
await asyncio.sleep(30*60)
@query.field("topShoutsByView") @query.field("topShoutsByView")
async def top_shouts_by_view(_, info, limit): async def top_shouts_by_view(_, info, limit):
@ -297,7 +286,7 @@ async def rate_shout(_, info, shout_id, value):
@mutation.field("viewShout") @mutation.field("viewShout")
async def view_shout(_, info, shout_id): async def view_shout(_, info, shout_id):
view_storage.inc_view(shout_id) await ShoutViewStorage.inc_view(shout_id)
return {"error" : ""} return {"error" : ""}
@query.field("getShoutBySlug") @query.field("getShoutBySlug")
@ -311,5 +300,5 @@ async def get_shout_by_slug(_, info, slug):
options(select_options).\ options(select_options).\
filter(Shout.slug == slug).first() filter(Shout.slug == slug).first()
shout.rating = rating_storage.get_rating(shout.id) shout.rating = rating_storage.get_rating(shout.id)
shout.views = view_storage.get_view(shout.id) shout.views = await ShoutViewStorage.get_view(shout.id)
return shout return shout