From c32fc435385e22727327e34ec3900040ac501678 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Wed, 29 Sep 2021 15:59:48 +0300 Subject: [PATCH] ShoutViewStorage refactor --- main.py | 6 ++-- orm/__init__.py | 2 +- orm/shout.py | 72 +++++++++++++++++++++++++++++++++-------------- resolvers/zine.py | 17 ++--------- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/main.py b/main.py index faedf1e7..e10719f7 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,9 @@ from auth.oauth import oauth_login, oauth_authorize from auth.email import email_authorize from redis import redis from resolvers.base import resolvers -from resolvers.zine import GitTask, TopShouts, db_flush_worker +from resolvers.zine import GitTask, TopShouts + +from orm.shout import ShoutViewStorage import asyncio @@ -29,7 +31,7 @@ async def start_up(): await redis.connect() git_task = asyncio.create_task(GitTask.git_task_worker()) top_shouts_task = asyncio.create_task(TopShouts.worker()) - db_flush_task = asyncio.create_task(db_flush_worker()) + view_storage_task = asyncio.create_task(ShoutViewStorage.worker()) async def shutdown(): await redis.disconnect() diff --git a/orm/__init__.py b/orm/__init__.py index bd267234..d2e94d21 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -17,4 +17,4 @@ Resource.init_table() with local_session() as session: rating_storage = ShoutRatingStorage(session) - view_storage = ShoutViewStorage(session) + ShoutViewStorage.init(session) diff --git a/orm/shout.py b/orm/shout.py index a550c669..ce26bceb 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -1,14 +1,16 @@ from typing import List -from datetime import datetime +from datetime import datetime, timedelta from sqlalchemy import Table, Column, Integer, String, ForeignKey, DateTime, Boolean from sqlalchemy.orm import relationship from sqlalchemy.orm.attributes import flag_modified from orm import Permission, User, Topic from orm.comment import Comment -from orm.base import Base +from orm.base import Base, local_session from functools import reduce +import asyncio + class ShoutAuthor(Base): __tablename__ = "shout_author" @@ -68,9 +70,17 @@ class ShoutViewByDay(Base): class ShoutViewStorage: - def __init__(self, session): + views = [] + this_day_views = {} + + period = 30*60 #sec + + lock = asyncio.Lock() + + @staticmethod + def init(session): + self = ShoutViewStorage self.views = session.query(ShoutViewByDay).all() - self.this_day_views = {} for view in self.views: shout_id = view.shout_id if not shout_id in self.this_day_views: @@ -79,28 +89,48 @@ class ShoutViewStorage: if this_day_view.day < view.day: self.this_day_views[shout_id] = view - def get_view(self, shout_id): - shout_views = list(filter(lambda x: x.shout_id == shout_id, self.views)) + @staticmethod + async def get_view(shout_id): + async with ShoutViewStorage.lock: + shout_views = list(filter(lambda x: x.shout_id == shout_id, ShoutViewStorage.views)) return reduce((lambda x, y: x + y.value), shout_views, 0) - def inc_view(self, shout_id): - this_day_view = self.this_day_views.get(shout_id) - if not this_day_view: - this_day_view = ShoutViewByDay.create(shout_id = shout_id, value = 1) - self.this_day_views[shout_id] = this_day_view - self.views.append(this_day_view) - else: - this_day_view.value = this_day_view.value + 1 - this_day_view.modified = True + @staticmethod + async def inc_view(shout_id): + self = ShoutViewStorage + async with ShoutViewStorage.lock: + this_day_view = self.this_day_views.get(shout_id) + day_start = datetime.now().replace(hour = 0, minute = 0, second = 0) + if not this_day_view or this_day_view.day < day_start: + this_day_view = ShoutViewByDay.create(shout_id = shout_id, value = 1) + self.this_day_views[shout_id] = this_day_view + self.views.append(this_day_view) + else: + this_day_view.value = this_day_view.value + 1 + this_day_view.modified = True - def flush_changes(self, session): - for view in self.this_day_views.values(): - if getattr(view, "modified", False): - session.add(view) - flag_modified(view, "value") - view.modified = False + @staticmethod + async def flush_changes(session): + async with ShoutViewStorage.lock: + for view in ShoutViewStorage.this_day_views.values(): + if getattr(view, "modified", False): + session.add(view) + flag_modified(view, "value") + view.modified = False session.commit() + @staticmethod + async def worker(): + print("ShoutViewStorage worker start") + while True: + try: + print("ShoutViewStorage worker: flush changes") + with local_session() as session: + await ShoutViewStorage.flush_changes(session) + except Exception as err: + print("ShoutViewStorage worker: error = %s" % (err)) + await asyncio.sleep(ShoutViewStorage.period) + class Shout(Base): __tablename__ = 'shout' diff --git a/resolvers/zine.py b/resolvers/zine.py index 59dbaeec..7703bd24 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,5 +1,5 @@ from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\ - rating_storage, view_storage + rating_storage, ShoutViewStorage from orm.base import local_session from resolvers.base import mutation, query @@ -165,17 +165,6 @@ class TopShouts: print("top shouts worker error = %s" % (err)) await asyncio.sleep(TopShouts.period) -async def db_flush_worker(): - print("db flush worker start") - while True: - try: - print("flush changes") - with local_session() as session: - view_storage.flush_changes(session) - except Exception as err: - print("db flush worker error = %s" % (err)) - await asyncio.sleep(30*60) - @query.field("topShoutsByView") async def top_shouts_by_view(_, info, limit): @@ -297,7 +286,7 @@ async def rate_shout(_, info, shout_id, value): @mutation.field("viewShout") async def view_shout(_, info, shout_id): - view_storage.inc_view(shout_id) + await ShoutViewStorage.inc_view(shout_id) return {"error" : ""} @query.field("getShoutBySlug") @@ -311,5 +300,5 @@ async def get_shout_by_slug(_, info, slug): options(select_options).\ filter(Shout.slug == slug).first() shout.rating = rating_storage.get_rating(shout.id) - shout.views = view_storage.get_view(shout.id) + shout.views = await ShoutViewStorage.get_view(shout.id) return shout