From 7dea19495c6ec92e70104234b644285ac7242a2c Mon Sep 17 00:00:00 2001 From: knst-kotov Date: Wed, 24 Nov 2021 10:36:06 +0300 Subject: [PATCH] one subscription on messages --- resolvers/inbox.py | 64 +++++++++++++++++++++++++++------------------- schema.graphql | 15 ++++++++--- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/resolvers/inbox.py b/resolvers/inbox.py index a6f710cb..07a7f6fa 100644 --- a/resolvers/inbox.py +++ b/resolvers/inbox.py @@ -7,12 +7,30 @@ from auth.authenticate import login_required import asyncio +class MessageSubscriptions: + lock = asyncio.Lock() + subscriptions = [] -class MessageQueue: + @staticmethod + async def register_subscription(subs): + async with MessageSubscriptions.lock: + MessageSubscriptions.subscriptions.append(subs) - new_message = asyncio.Queue() - updated_message = asyncio.Queue() - deleted_message = asyncio.Queue() + @staticmethod + async def del_subscription(subs): + async with MessageSubscriptions.lock: + MessageSubscriptions.subscriptions.remove(subs) + + @staticmethod + async def put(msg): + async with MessageSubscriptions.lock: + for subs in MessageSubscriptions.subscriptions: + subs.put_nowait(msg) + +class MessageResult: + def __init__(self, status, message): + self.status = status + self.message = message @mutation.field("createMessage") @@ -27,7 +45,8 @@ async def create_message(_, info, body, replyTo = None): replyTo = replyTo ) - MessageQueue.new_message.put_nowait(new_message) + result = MessageResult("NEW", new_message) + await MessageSubscriptions.put(result) return {"message" : new_message} @@ -68,7 +87,8 @@ async def update_message(_, info, id, body): message.body = body session.commit() - MessageQueue.updated_message.put_nowait(message) + result = MessageResult("UPDATED", message) + await MessageSubscriptions.put(result) return {"message" : message} @@ -87,31 +107,23 @@ async def delete_message(_, info, id): session.delete(message) session.commit() - MessageQueue.deleted_message.put_nowait(message) + result = MessageResult("DELETED", message) + await MessageSubscriptions.put(result) return {} -@subscription.source("messageCreated") +@subscription.source("messageChanged") async def new_message_generator(obj, info): - while True: - new_message = await MessageQueue.new_message.get() - yield new_message + try: + msg_queue = asyncio.Queue() + await MessageSubscriptions.register_subscription(msg_queue) + while True: + msg = await msg_queue.get() + yield msg + finally: + await MessageSubscriptions.del_subscription(msg_queue) -@subscription.source("messageUpdated") -async def updated_message_generator(obj, info): - while True: - message = await MessageQueue.updated_message.get() - yield message - -@subscription.source("messageDeleted") -async def deleted_message_generator(obj, info): - while True: - message = await MessageQueue.deleted_message.get() - yield new_message - -@subscription.field("messageCreated") -@subscription.field("messageUpdated") -@subscription.field("messageDeleted") +@subscription.field("messageChanged") def message_resolver(message, info): return message diff --git a/schema.graphql b/schema.graphql index 18a37831..179cb1fd 100644 --- a/schema.graphql +++ b/schema.graphql @@ -51,6 +51,17 @@ type CommentResult { comment: Comment } +enum MessageStatus { + NEW + UPDATED + DELETED +} + +type MessageWithStatus { + status: MessageStatus! + message: Message! +} + ################################### Mutation type Mutation { @@ -132,9 +143,7 @@ type Query { ############################################ Subscription type Subscription { - messageCreated: Message! - messageUpdated: Message! - messageDeleted: Message! + messageChanged: MessageWithStatus! onlineUpdated: [User!]! shoutUpdated: Shout!