chatUpdated subscription

This commit is contained in:
knst-kotov 2022-01-24 17:02:44 +03:00
parent ca9c3e0cee
commit 34b48cbc90
2 changed files with 31 additions and 20 deletions

View File

@ -10,6 +10,12 @@ from datetime import datetime
from redis import redis from redis import redis
class MessageSubscription:
queue = asyncio.Queue()
def __init__(self, chat_id):
self.chat_id = chat_id
class MessageSubscriptions: class MessageSubscriptions:
lock = asyncio.Lock() lock = asyncio.Lock()
subscriptions = [] subscriptions = []
@ -25,10 +31,11 @@ class MessageSubscriptions:
MessageSubscriptions.subscriptions.remove(subs) MessageSubscriptions.subscriptions.remove(subs)
@staticmethod @staticmethod
async def put(msg): async def put(message_result):
async with MessageSubscriptions.lock: async with MessageSubscriptions.lock:
for subs in MessageSubscriptions.subscriptions: for subs in MessageSubscriptions.subscriptions:
subs.put_nowait(msg) if message_result.message["chatId"] == subs.chat_id:
subs.queue.put_nowait(message_result)
class MessageResult: class MessageResult:
def __init__(self, status, message): def __init__(self, status, message):
@ -97,6 +104,9 @@ async def create_message(_, info, chatId, body, replyTo = None):
await redis.execute("LPUSH", f"chats/{chatId}/message_ids", str(message_id)) await redis.execute("LPUSH", f"chats/{chatId}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chatId}/next_message_id", str(message_id + 1)) await redis.execute("SET", f"chats/{chatId}/next_message_id", str(message_id + 1))
result = MessageResult("NEW", new_message)
await MessageSubscriptions.put(result)
return {"message" : new_message} return {"message" : new_message}
@query.field("getMessages") @query.field("getMessages")
@ -139,8 +149,8 @@ async def update_message(_, info, chatId, id, body):
await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message)) await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message))
#result = MessageResult("UPDATED", message) result = MessageResult("UPDATED", message)
#await MessageSubscriptions.put(result) await MessageSubscriptions.put(result)
return {"message" : message} return {"message" : message}
@ -153,29 +163,31 @@ async def delete_message(_, info, chatId, id):
if not chat: if not chat:
return { "error" : "chat not exist" } return { "error" : "chat not exist" }
count = await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id)) message = await redis.execute("GET", f"chats/{chatId}/messages/{id}")
if count == 0: if not message:
return { "error" : "message not exist" } return { "error" : "message not exist" }
message = json.loads(message)
await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id))
await redis.execute("DEL", f"chats/{chatId}/messages/{id}") await redis.execute("DEL", f"chats/{chatId}/messages/{id}")
#result = MessageResult("DELETED", message) result = MessageResult("DELETED", message)
#await MessageSubscriptions.put(result) await MessageSubscriptions.put(result)
return {} return {}
@subscription.source("messageChanged") @subscription.source("chatUpdated")
async def new_message_generator(obj, info): async def message_generator(obj, info, chatId):
try: try:
msg_queue = asyncio.Queue() subs = MessageSubscription(chatId)
await MessageSubscriptions.register_subscription(msg_queue) await MessageSubscriptions.register_subscription(subs)
while True: while True:
msg = await msg_queue.get() msg = await subs.queue.get()
yield msg yield msg
finally: finally:
await MessageSubscriptions.del_subscription(msg_queue) await MessageSubscriptions.del_subscription(subs)
@subscription.field("messageChanged") @subscription.field("chatUpdated")
def message_resolver(message, info): def message_resolver(message, info, chatId):
return message return message

View File

@ -186,8 +186,7 @@ type Query {
############################################ Subscription ############################################ Subscription
type Subscription { type Subscription {
messageChanged: MessageWithStatus! chatUpdated(chatId: String!): MessageWithStatus!
chatUpdated: MessageWithStatus!
onlineUpdated: [User!]! onlineUpdated: [User!]!
shoutUpdated: Shout! shoutUpdated: Shout!
userUpdated: User! userUpdated: User!