init-following-manager

This commit is contained in:
tonyrewin 2023-02-06 16:09:26 +03:00
parent e1e3e9fdde
commit 27acf62c2e
5 changed files with 130 additions and 56 deletions

View File

@ -7,8 +7,7 @@ from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.redis import redis
from base.resolvers import mutation, subscription
from services.inbox.helpers import ChatFollowing, MessageResult
from services.inbox.storage import MessagesStorage
from services.following import FollowingManager, FollowingResult, Following
from validations.inbox import Message
@ -51,8 +50,8 @@ async def create_message(_, info, chat: str, body: str, replyTo=None):
"LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id)
)
result = MessageResult("NEW", new_message)
await MessagesStorage.put(result)
result = FollowingResult("NEW", 'chat', new_message)
await FollowingManager.put('chat', result)
return {
"message": new_message,
@ -82,8 +81,8 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str):
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
result = MessageResult("UPDATED", message)
await MessagesStorage.put(result)
result = FollowingResult("UPDATED", 'chat', message)
await FollowingManager.put('chat', result)
return {
"message": message,
@ -115,8 +114,8 @@ async def delete_message(_, info, chat_id: str, message_id: int):
for user_id in users:
await redis.execute("LREM", f"chats/{chat_id}/unread/{user_id}", 0, str(message_id))
result = MessageResult("DELETED", message)
await MessagesStorage.put(result)
result = FollowingResult("DELETED", 'chat', message)
await FollowingManager.put(result)
return {}
@ -162,8 +161,8 @@ async def message_generator(_, info: GraphQLResolveInfo):
user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True)
for chat_id in user_following_chats_sorted:
following_chat = ChatFollowing(chat_id)
await MessagesStorage.register_chat(following_chat)
following_chat = Following('chat', chat_id)
await FollowingManager.register('chat', following_chat)
chat_task = following_chat.queue.get()
tasks.append(chat_task)
@ -171,7 +170,7 @@ async def message_generator(_, info: GraphQLResolveInfo):
msg = await asyncio.gather(*tasks)
yield msg
finally:
await MessagesStorage.remove_chat(following_chat)
await FollowingManager.remove('chat', following_chat)
@subscription.field("newMessage")

View File

@ -1,14 +1,35 @@
import asyncio
from base.orm import local_session
from base.resolvers import mutation, subscription, query
from auth.authenticate import login_required
from auth.credentials import AuthCredentials
from base.resolvers import mutation, subscription
# from resolvers.community import community_follow, community_unfollow
from orm.user import AuthorFollower
from orm.topic import TopicFollower
from orm.shout import Shout, ShoutReactionsFollower
from resolvers.zine.profile import author_follow, author_unfollow
from resolvers.zine.reactions import reactions_follow, reactions_unfollow
from resolvers.zine.topics import topic_follow, topic_unfollow
import asyncio
from services.following import Following, FollowingManager, FollowingResult
from graphql.type import GraphQLResolveInfo
@query.field("myFeed")
@login_required
async def get_my_feed(_, info):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
try:
with local_session() as session:
following_authors = session.query(AuthorFollower).where(AuthorFollower.follower == user_id).unique().all()
following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).unique().all()
# TODO: my feed query
shouts = []
return shouts
except Exception:
pass
@mutation.field("follow")
@login_required
async def follow(_, info, what, slug):
@ -17,13 +38,21 @@ async def follow(_, info, what, slug):
try:
if what == "AUTHOR":
author_follow(auth.user_id, slug)
result = FollowingResult("NEW", 'author', slug)
await FollowingManager.put('author', result)
elif what == "TOPIC":
topic_follow(auth.user_id, slug)
result = FollowingResult("NEW", 'topic', slug)
await FollowingManager.put('topic', result)
elif what == "COMMUNITY":
# community_follow(user, slug)
# result = FollowingResult("NEW", 'community', slug)
# await FollowingManager.put('community', result)
pass
elif what == "REACTIONS":
reactions_follow(auth.user_id, slug)
result = FollowingResult("NEW", 'shout', slug)
await FollowingManager.put('shout', result)
except Exception as e:
return {"error": str(e)}
@ -38,19 +67,28 @@ async def unfollow(_, info, what, slug):
try:
if what == "AUTHOR":
author_unfollow(auth.user_id, slug)
result = FollowingResult("DELETED", 'author', slug)
await FollowingManager.put('author', result)
elif what == "TOPIC":
topic_unfollow(auth.user_id, slug)
result = FollowingResult("DELETED", 'topic', slug)
await FollowingManager.put('topic', result)
elif what == "COMMUNITY":
# community_unfollow(user, slug)
# result = FollowingResult("DELETED", 'community', slug)
# await FollowingManager.put('community', result)
pass
elif what == "REACTIONS":
reactions_unfollow(auth.user_id, slug)
result = FollowingResult("DELETED", 'shout', slug)
await FollowingManager.put('shout', result)
except Exception as e:
return {"error": str(e)}
return {}
# by author and by topic
@subscription.source("newShout")
@login_required
async def shout_generator(_, info: GraphQLResolveInfo):
@ -60,7 +98,24 @@ async def shout_generator(_, info: GraphQLResolveInfo):
try:
tasks = []
# TODO: implement when noticing new shout
with local_session() as session:
following_authors = session.query(AuthorFollower).where(
AuthorFollower.follower == user_id).all()
following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).all()
# notify new shout
for topic_id in following_topics:
following_topic = Following('topic', topic_id)
await FollowingManager.register('topic', following_topic)
following_topic_task = following_topic.queue.get()
tasks.append(following_topic_task)
for author_id in following_authors:
following_author = Following('author', author_id)
await FollowingManager.register('author', following_author)
following_author_task = following_author.queue.get()
tasks.append(following_author_task)
while True:
shout = await asyncio.gather(*tasks)
@ -76,12 +131,21 @@ async def reaction_generator(_, info):
auth: AuthCredentials = info.context["request"].auth
user_id = auth.user_id
try:
tasks = []
with local_session() as session:
followings = session.query(ShoutReactionsFollower.shout).where(
ShoutReactionsFollower.follower == user_id).unique()
# TODO: implement when noticing new reaction
# notify new reaction
while True:
reaction = await asyncio.gather(*tasks)
yield reaction
tasks = []
for shout_id in followings:
following_shout = Following('shout', shout_id)
await FollowingManager.register('shout', following_shout)
following_author_task = following_shout.queue.get()
tasks.append(following_author_task)
while True:
reaction = await asyncio.gather(*tasks)
yield reaction
finally:
pass

48
services/following.py Normal file
View File

@ -0,0 +1,48 @@
import asyncio
class FollowingResult:
def __init__(self, event, kind, payload):
self.event = event
self.kind = kind
self.payload = payload
class Following:
queue = asyncio.Queue()
def __init__(self, kind, uid):
self.kind = kind # author topic shout chat
self.uid = uid
class FollowingManager:
lock = asyncio.Lock()
data = {
'author': [],
'topic': [],
'shout': [],
'chat': []
}
@staticmethod
async def register(kind, uid):
async with FollowingManager.lock:
FollowingManager[kind].append(uid)
@staticmethod
async def remove(kind, uid):
async with FollowingManager.lock:
FollowingManager[kind].remove(uid)
@staticmethod
async def push(kind, payload):
async with FollowingManager.lock:
if kind == 'chat':
for chat in FollowingManager['chat']:
if payload.message["chatId"] == chat.uid:
chat.queue.put_nowait(payload)
else:
for entity in FollowingManager[kind]:
if payload.shout['createdBy'] == entity.uid:
entity.queue.put_nowait(payload)

View File

@ -1,14 +0,0 @@
import asyncio
class MessageResult:
def __init__(self, status, message):
self.seen = status
self.message = message
class ChatFollowing:
queue = asyncio.Queue()
def __init__(self, chat_id):
self.chat_id = chat_id

View File

@ -1,23 +0,0 @@
import asyncio
class MessagesStorage:
lock = asyncio.Lock()
chats = []
@staticmethod
async def register_chat(chat):
async with MessagesStorage.lock:
MessagesStorage.chats.append(chat)
@staticmethod
async def remove_chat(chat):
async with MessagesStorage.lock:
MessagesStorage.chats.remove(chat)
@staticmethod
async def put(message_result):
async with MessagesStorage.lock:
for chat in MessagesStorage.chats:
if message_result.message["chatId"] == chat.chat_id:
chat.queue.put_nowait(message_result)