diff --git a/main.py b/main.py index f0e2503b..3cdf0fd7 100644 --- a/main.py +++ b/main.py @@ -13,6 +13,7 @@ from sentry_sdk.integrations.starlette import StarletteIntegration from starlette.applications import Starlette from starlette.routing import Route +from services.following import FollowingManager from services.rediscache import redis from services.schema import resolvers from services.search import search_service @@ -35,6 +36,9 @@ async def start_up(): # start viewed service await ViewedStorage.init() + # preload following data + await FollowingManager.preload() + # start search service search_service.info() diff --git a/services/following.py b/services/following.py index 3ce3d06f..e2a92aca 100644 --- a/services/following.py +++ b/services/following.py @@ -1,4 +1,18 @@ import asyncio +import logging +import time + +from orm.author import AuthorFollower +from orm.shout import ShoutReactionsFollower +from orm.topic import TopicFollower +from services.db import local_session + + +logger = logging.getLogger('[services.following] ') +logger.setLevel(logging.DEBUG) + + +MODEL_CLASSES = {'author': AuthorFollower, 'topic': TopicFollower, 'shout': ShoutReactionsFollower} class FollowingResult: @@ -9,39 +23,96 @@ class FollowingResult: class Following: - queue = asyncio.Queue() - def __init__(self, kind, uid): - self.kind = kind # author topic shout community + self.kind = kind # author, topic, shout self.uid = uid + self.queue = asyncio.Queue() class FollowingManager: lock = asyncio.Lock() - followers_by_kind = {} - data = {'author': [], 'topic': [], 'shout': [], 'community': []} + followers_by_kind = {'author': [], 'topic': [], 'shout': []} + authors_by_follower = {} + topics_by_follower = {} + shouts_by_follower = {} + + @staticmethod + async def preload(): + logger.info(' preloading started...') + ts = int(time.time()) + async with FollowingManager.lock: + with local_session() as session: + # Load followers_by_kind + for kind in FollowingManager.followers_by_kind.keys(): + model_class = MODEL_CLASSES[kind] + followers = session.query(model_class.follower).distinct().all() + FollowingManager.followers_by_kind[kind] = [follower[0] for follower in followers] + + # Load authors_by_follower + for following in session.query(AuthorFollower).all(): + FollowingManager.authors_by_follower[following.follower] = FollowingManager.authors_by_follower.get( + following.follower, [] + ) + FollowingManager.authors_by_follower[following.follower].append(following.author) + + # Load topics_by_follower + for following in session.query(TopicFollower).all(): + FollowingManager.topics_by_follower[following.follower] = FollowingManager.topics_by_follower.get( + following.follower, [] + ) + FollowingManager.topics_by_follower[following.follower].append(following.topic) + + # Load shouts_by_follower + for following in session.query(ShoutReactionsFollower).all(): + FollowingManager.shouts_by_follower[following.follower] = FollowingManager.shouts_by_follower.get( + following.follower, [] + ) + FollowingManager.shouts_by_follower[following.follower].append(following.shout) + logger.info(f' preloading finished at {(int(time.time()) - ts)/1000} secs') @staticmethod async def register(kind, uid): async with FollowingManager.lock: - FollowingManager.followers_by_kind[kind] = FollowingManager.followers_by_kind.get(kind, []) - FollowingManager.followers_by_kind[kind].append(uid) + if uid not in FollowingManager.followers_by_kind[kind]: + FollowingManager.followers_by_kind[kind].append(uid) @staticmethod async def remove(kind, uid): async with FollowingManager.lock: - followings = FollowingManager.followers_by_kind.get(kind) - if followings: - followings.remove(uid) - FollowingManager.followers_by_kind[kind] = followings + FollowingManager.followers_by_kind[kind] = [ + follower for follower in FollowingManager.followers_by_kind[kind] if follower != uid + ] @staticmethod async def push(kind, payload): try: async with FollowingManager.lock: - entities = FollowingManager.followers_by_kind.get(kind, []) - for entity in entities[:]: # Use a copy to iterate - if payload.shout['created_by'] == entity.uid: - entity.queue.put_nowait(payload) + for entity in FollowingManager.followers_by_kind[kind]: + if payload.shout['created_by'] == entity: + await entity.queue.put(payload) except Exception as e: - print(Exception(e)) + print(f'Error in push method: {e}') + + @staticmethod + async def get_followers_by_kind(kind, target_id=None): + async with FollowingManager.lock: + return ( + FollowingManager.followers_by_kind[kind][target_id] + if target_id + else FollowingManager.followers_by_kind[kind] + ) + + @staticmethod + async def get_authors_for(follower_id): + async with FollowingManager.lock: + return FollowingManager.authors_by_follower.get(follower_id, []) + + @staticmethod + async def get_topics_for(follower_id): + async with FollowingManager.lock: + return FollowingManager.topics_by_follower.get(follower_id, []) + + @staticmethod + async def get_shouts_for(follower_id): + async with FollowingManager.lock: + return FollowingManager.shouts_by_follower.get(follower_id, [])