From 7f54db89394ffbd622d1d993a7299852b39e0704 Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Wed, 10 Nov 2021 11:42:29 +0300 Subject: [PATCH] topicUpdated --- orm/topic.py | 2 +- resolvers/topics.py | 21 +++++++++++++-------- resolvers/zine.py | 31 +++++++++++++++++++++++++++++++ schema.graphql | 4 ++-- 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/orm/topic.py b/orm/topic.py index 719c3732..c45eee98 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -15,7 +15,7 @@ class TopicSubscription(Base): __tablename__ = "topic_subscription" id = None - topic = Column(ForeignKey('topic.slug'), primary_key = True) + topic = Column(ForeignKey('topic.id'), primary_key = True) user = Column(ForeignKey('user.id'), primary_key = True) createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") diff --git a/resolvers/topics.py b/resolvers/topics.py index 3bef55f7..36627f4f 100644 --- a/resolvers/topics.py +++ b/resolvers/topics.py @@ -1,6 +1,7 @@ from orm import Topic, TopicSubscription from orm.base import local_session from resolvers.base import mutation, query, subscription +from resolvers.zine import ShoutSubscriptions from auth.authenticate import login_required import asyncio @@ -48,14 +49,18 @@ async def topic_unsubscribe(_, info, slug): @subscription.source("topicUpdated") async def new_shout_generator(obj, info, user_id): - with local_session() as session: - topics = session.query(TopicSubscription.topic).filter(TopicSubscription.user == user_id).all() - #TODO filter new shouts - while True: - new_shout = {"slug": "slug", "body" : "body"} - yield new_shout - await asyncio.sleep(30) - print("end") + try: + with local_session() as session: + topics = session.query(TopicSubscription.topic).filter(TopicSubscription.user == user_id).all() + topics = set([item.topic for item in topics]) + shouts_queue = asyncio.Queue() + await ShoutSubscriptions.register_subscription(shouts_queue) + while True: + shout = await shouts_queue.get() + if topics.intersection(set(shout.topic_ids)): + yield shout + finally: + await ShoutSubscriptions.del_subscription(shouts_queue) @subscription.field("topicUpdated") def shout_resolver(shout, info, user_id): diff --git a/resolvers/zine.py b/resolvers/zine.py index 3a7cd3c5..01167733 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -188,7 +188,27 @@ class ShoutsCache: print("shouts cache worker error = %s" % (err)) await asyncio.sleep(ShoutsCache.period) +class ShoutSubscriptions: + lock = asyncio.Lock() + subscriptions = [] + @staticmethod + async def register_subscription(subs): + async with ShoutSubscriptions.lock: + ShoutSubscriptions.subscriptions.append(subs) + + @staticmethod + async def del_subscription(subs): + async with ShoutSubscriptions.lock: + ShoutSubscriptions.subscriptions.remove(subs) + + @staticmethod + async def send_shout(shout): + async with ShoutSubscriptions.lock: + for subs in ShoutSubscriptions.subscriptions: + subs.put_nowait(shout) + + @query.field("topViewed") async def top_viewed(_, info, limit): async with ShoutsCache.lock: @@ -224,11 +244,20 @@ async def create_shout(_, info, input): with local_session() as session: user = session.query(User).filter(User.id == user_id).first() + + topic_ids = input.get("topic_ids") + del input["topic_ids"] new_shout = Shout.create(**input) ShoutAuthor.create( shout = new_shout.id, user = user_id) + + for id in topic_ids: + topic = ShoutTopic.create( + shout = new_shout.id, + topic = id) + new_shout.topic_ids = topic_ids task = GitTask( input, @@ -236,6 +265,8 @@ async def create_shout(_, info, input): user.email, "new shout %s" % (new_shout.slug) ) + + await ShoutSubscriptions.send_shout(new_shout) return { "shout" : new_shout diff --git a/schema.graphql b/schema.graphql index 26dec28b..348ca252 100644 --- a/schema.graphql +++ b/schema.graphql @@ -27,7 +27,7 @@ input ShoutInput { body: String! # replyTo: String # another shout # tags: [String] # actual values - topics: [String] + topic_ids: [Int] title: String subtitle: String versionOf: String @@ -239,7 +239,7 @@ type Shout { # replyTo: Shout versionOf: Shout tags: [String] # actual values - topics: [String] # topic-slugs, order has matter + topics: [Topic] title: String subtitle: String updatedAt: DateTime