diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 3a6936d0..d33af9c3 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -7,8 +7,7 @@ from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import mutation, subscription -from services.inbox.helpers import ChatFollowing, MessageResult -from services.inbox.storage import MessagesStorage +from services.following import FollowingManager, FollowingResult, Following from validations.inbox import Message @@ -51,8 +50,8 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): "LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id) ) - result = MessageResult("NEW", new_message) - await MessagesStorage.put(result) + result = FollowingResult("NEW", 'chat', new_message) + await FollowingManager.put('chat', result) return { "message": new_message, @@ -82,8 +81,8 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str): await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) - result = MessageResult("UPDATED", message) - await MessagesStorage.put(result) + result = FollowingResult("UPDATED", 'chat', message) + await FollowingManager.put('chat', result) return { "message": message, @@ -115,8 +114,8 @@ async def delete_message(_, info, chat_id: str, message_id: int): for user_id in users: await redis.execute("LREM", f"chats/{chat_id}/unread/{user_id}", 0, str(message_id)) - result = MessageResult("DELETED", message) - await MessagesStorage.put(result) + result = FollowingResult("DELETED", 'chat', message) + await FollowingManager.put(result) return {} @@ -162,8 +161,8 @@ async def message_generator(_, info: GraphQLResolveInfo): user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) for chat_id in user_following_chats_sorted: - following_chat = ChatFollowing(chat_id) - await MessagesStorage.register_chat(following_chat) + following_chat = Following('chat', chat_id) + await FollowingManager.register('chat', following_chat) chat_task = following_chat.queue.get() tasks.append(chat_task) @@ -171,7 +170,7 @@ async def message_generator(_, info: GraphQLResolveInfo): msg = await asyncio.gather(*tasks) yield msg finally: - await MessagesStorage.remove_chat(following_chat) + await FollowingManager.remove('chat', following_chat) @subscription.field("newMessage") diff --git a/resolvers/zine/following.py b/resolvers/zine/following.py index f6acff43..72c3a0e9 100644 --- a/resolvers/zine/following.py +++ b/resolvers/zine/following.py @@ -1,14 +1,35 @@ +import asyncio +from base.orm import local_session +from base.resolvers import mutation, subscription, query from auth.authenticate import login_required from auth.credentials import AuthCredentials -from base.resolvers import mutation, subscription # from resolvers.community import community_follow, community_unfollow +from orm.user import AuthorFollower +from orm.topic import TopicFollower +from orm.shout import Shout, ShoutReactionsFollower from resolvers.zine.profile import author_follow, author_unfollow from resolvers.zine.reactions import reactions_follow, reactions_unfollow from resolvers.zine.topics import topic_follow, topic_unfollow -import asyncio +from services.following import Following, FollowingManager, FollowingResult from graphql.type import GraphQLResolveInfo +@query.field("myFeed") +@login_required +async def get_my_feed(_, info): + auth: AuthCredentials = info.context["request"].auth + user_id = auth.user_id + try: + with local_session() as session: + following_authors = session.query(AuthorFollower).where(AuthorFollower.follower == user_id).unique().all() + following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).unique().all() + # TODO: my feed query + shouts = [] + return shouts + except Exception: + pass + + @mutation.field("follow") @login_required async def follow(_, info, what, slug): @@ -17,13 +38,21 @@ async def follow(_, info, what, slug): try: if what == "AUTHOR": author_follow(auth.user_id, slug) + result = FollowingResult("NEW", 'author', slug) + await FollowingManager.put('author', result) elif what == "TOPIC": topic_follow(auth.user_id, slug) + result = FollowingResult("NEW", 'topic', slug) + await FollowingManager.put('topic', result) elif what == "COMMUNITY": # community_follow(user, slug) + # result = FollowingResult("NEW", 'community', slug) + # await FollowingManager.put('community', result) pass elif what == "REACTIONS": reactions_follow(auth.user_id, slug) + result = FollowingResult("NEW", 'shout', slug) + await FollowingManager.put('shout', result) except Exception as e: return {"error": str(e)} @@ -38,19 +67,28 @@ async def unfollow(_, info, what, slug): try: if what == "AUTHOR": author_unfollow(auth.user_id, slug) + result = FollowingResult("DELETED", 'author', slug) + await FollowingManager.put('author', result) elif what == "TOPIC": topic_unfollow(auth.user_id, slug) + result = FollowingResult("DELETED", 'topic', slug) + await FollowingManager.put('topic', result) elif what == "COMMUNITY": # community_unfollow(user, slug) + # result = FollowingResult("DELETED", 'community', slug) + # await FollowingManager.put('community', result) pass elif what == "REACTIONS": reactions_unfollow(auth.user_id, slug) + result = FollowingResult("DELETED", 'shout', slug) + await FollowingManager.put('shout', result) except Exception as e: return {"error": str(e)} return {} +# by author and by topic @subscription.source("newShout") @login_required async def shout_generator(_, info: GraphQLResolveInfo): @@ -60,7 +98,24 @@ async def shout_generator(_, info: GraphQLResolveInfo): try: tasks = [] - # TODO: implement when noticing new shout + with local_session() as session: + following_authors = session.query(AuthorFollower).where( + AuthorFollower.follower == user_id).all() + following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).all() + + # notify new shout + + for topic_id in following_topics: + following_topic = Following('topic', topic_id) + await FollowingManager.register('topic', following_topic) + following_topic_task = following_topic.queue.get() + tasks.append(following_topic_task) + + for author_id in following_authors: + following_author = Following('author', author_id) + await FollowingManager.register('author', following_author) + following_author_task = following_author.queue.get() + tasks.append(following_author_task) while True: shout = await asyncio.gather(*tasks) @@ -76,12 +131,21 @@ async def reaction_generator(_, info): auth: AuthCredentials = info.context["request"].auth user_id = auth.user_id try: - tasks = [] + with local_session() as session: + followings = session.query(ShoutReactionsFollower.shout).where( + ShoutReactionsFollower.follower == user_id).unique() - # TODO: implement when noticing new reaction + # notify new reaction - while True: - reaction = await asyncio.gather(*tasks) - yield reaction + tasks = [] + for shout_id in followings: + following_shout = Following('shout', shout_id) + await FollowingManager.register('shout', following_shout) + following_author_task = following_shout.queue.get() + tasks.append(following_author_task) + + while True: + reaction = await asyncio.gather(*tasks) + yield reaction finally: pass diff --git a/services/following.py b/services/following.py new file mode 100644 index 00000000..f67f78a2 --- /dev/null +++ b/services/following.py @@ -0,0 +1,48 @@ +import asyncio + + +class FollowingResult: + def __init__(self, event, kind, payload): + self.event = event + self.kind = kind + self.payload = payload + + +class Following: + queue = asyncio.Queue() + + def __init__(self, kind, uid): + self.kind = kind # author topic shout chat + self.uid = uid + + +class FollowingManager: + lock = asyncio.Lock() + data = { + 'author': [], + 'topic': [], + 'shout': [], + 'chat': [] + } + + @staticmethod + async def register(kind, uid): + async with FollowingManager.lock: + FollowingManager[kind].append(uid) + + @staticmethod + async def remove(kind, uid): + async with FollowingManager.lock: + FollowingManager[kind].remove(uid) + + @staticmethod + async def push(kind, payload): + async with FollowingManager.lock: + if kind == 'chat': + for chat in FollowingManager['chat']: + if payload.message["chatId"] == chat.uid: + chat.queue.put_nowait(payload) + else: + for entity in FollowingManager[kind]: + if payload.shout['createdBy'] == entity.uid: + entity.queue.put_nowait(payload) diff --git a/services/inbox/helpers.py b/services/inbox/helpers.py deleted file mode 100644 index d8791218..00000000 --- a/services/inbox/helpers.py +++ /dev/null @@ -1,14 +0,0 @@ -import asyncio - - -class MessageResult: - def __init__(self, status, message): - self.seen = status - self.message = message - - -class ChatFollowing: - queue = asyncio.Queue() - - def __init__(self, chat_id): - self.chat_id = chat_id diff --git a/services/inbox/storage.py b/services/inbox/storage.py deleted file mode 100644 index dd6e5fcf..00000000 --- a/services/inbox/storage.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - - -class MessagesStorage: - lock = asyncio.Lock() - chats = [] - - @staticmethod - async def register_chat(chat): - async with MessagesStorage.lock: - MessagesStorage.chats.append(chat) - - @staticmethod - async def remove_chat(chat): - async with MessagesStorage.lock: - MessagesStorage.chats.remove(chat) - - @staticmethod - async def put(message_result): - async with MessagesStorage.lock: - for chat in MessagesStorage.chats: - if message_result.message["chatId"] == chat.chat_id: - chat.queue.put_nowait(message_result)