diff --git a/resolvers/inbox.py b/resolvers/inbox.py index 1e2aad03..87b19f1a 100644 --- a/resolvers/inbox.py +++ b/resolvers/inbox.py @@ -10,6 +10,12 @@ from datetime import datetime from redis import redis +class MessageSubscription: + queue = asyncio.Queue() + + def __init__(self, chat_id): + self.chat_id = chat_id + class MessageSubscriptions: lock = asyncio.Lock() subscriptions = [] @@ -25,10 +31,11 @@ class MessageSubscriptions: MessageSubscriptions.subscriptions.remove(subs) @staticmethod - async def put(msg): + async def put(message_result): async with MessageSubscriptions.lock: for subs in MessageSubscriptions.subscriptions: - subs.put_nowait(msg) + if message_result.message["chatId"] == subs.chat_id: + subs.queue.put_nowait(message_result) class MessageResult: def __init__(self, status, message): @@ -97,6 +104,9 @@ async def create_message(_, info, chatId, body, replyTo = None): await redis.execute("LPUSH", f"chats/{chatId}/message_ids", str(message_id)) await redis.execute("SET", f"chats/{chatId}/next_message_id", str(message_id + 1)) + result = MessageResult("NEW", new_message) + await MessageSubscriptions.put(result) + return {"message" : new_message} @query.field("getMessages") @@ -139,8 +149,8 @@ async def update_message(_, info, chatId, id, body): await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message)) - #result = MessageResult("UPDATED", message) - #await MessageSubscriptions.put(result) + result = MessageResult("UPDATED", message) + await MessageSubscriptions.put(result) return {"message" : message} @@ -153,29 +163,31 @@ async def delete_message(_, info, chatId, id): if not chat: return { "error" : "chat not exist" } - count = await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id)) - if count == 0: - return { "error" : "message not exist" } + message = await redis.execute("GET", f"chats/{chatId}/messages/{id}") + if not message: + return { "error" : "message not exist" } + message = json.loads(message) + await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id)) await redis.execute("DEL", f"chats/{chatId}/messages/{id}") - #result = MessageResult("DELETED", message) - #await MessageSubscriptions.put(result) - + result = MessageResult("DELETED", message) + await MessageSubscriptions.put(result) + return {} -@subscription.source("messageChanged") -async def new_message_generator(obj, info): +@subscription.source("chatUpdated") +async def message_generator(obj, info, chatId): try: - msg_queue = asyncio.Queue() - await MessageSubscriptions.register_subscription(msg_queue) + subs = MessageSubscription(chatId) + await MessageSubscriptions.register_subscription(subs) while True: - msg = await msg_queue.get() + msg = await subs.queue.get() yield msg finally: - await MessageSubscriptions.del_subscription(msg_queue) + await MessageSubscriptions.del_subscription(subs) -@subscription.field("messageChanged") -def message_resolver(message, info): +@subscription.field("chatUpdated") +def message_resolver(message, info, chatId): return message diff --git a/schema.graphql b/schema.graphql index 5fa7baf3..8bb3c985 100644 --- a/schema.graphql +++ b/schema.graphql @@ -186,8 +186,7 @@ type Query { ############################################ Subscription type Subscription { - messageChanged: MessageWithStatus! - chatUpdated: MessageWithStatus! + chatUpdated(chatId: String!): MessageWithStatus! onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User!