From 73797752f2bcf246998be4180c2756ffbbb46b95 Mon Sep 17 00:00:00 2001 From: Untone Date: Fri, 24 Nov 2023 05:18:02 +0300 Subject: [PATCH] notifier-draft-2 --- .gitea/workflows/main.yml | 22 +++++ README.md | 12 ++- main.py | 7 +- notifier.graphql | 0 resolvers/notifications.py | 9 +- services/auth.py | 2 +- services/core.py | 35 +++++++- services/keeper.py | 164 +++++++++++++++++++++++++++++++++++++ services/listener.py | 43 ++++++++++ 9 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 .gitea/workflows/main.yml create mode 100644 notifier.graphql create mode 100644 services/keeper.py create mode 100644 services/listener.py diff --git a/.gitea/workflows/main.yml b/.gitea/workflows/main.yml new file mode 100644 index 0000000..c88393d --- /dev/null +++ b/.gitea/workflows/main.yml @@ -0,0 +1,22 @@ +name: 'deploy' +on: [push] + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Cloning repo + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Get Repo Name + id: repo_name + run: echo "::set-output name=repo::$(echo ${GITHUB_REPOSITORY##*/})" + + - name: Push to dokku + uses: dokku/github-action@master + with: + branch: 'main' + git_remote_url: 'ssh://dokku@staging.discours.io:22/${{ steps.repo_name.outputs.repo }}' + ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} \ No newline at end of file diff --git a/README.md b/README.md index f88409e..b678172 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,13 @@ # notifier -собирает уведомления и формирует дайджесты \ No newline at end of file +### Что делает + + - слушает redis PubSub каналы реакций и постов + - собирает уведомления + - формирует дайджесты + + +### Что НЕ делает + + - не отправляет сообщения по SSE + - не определяет кому их отправлять diff --git a/main.py b/main.py index 0be786b..0f3461f 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import os +import asyncio from importlib import import_module from os.path import exists @@ -8,10 +9,11 @@ from starlette.applications import Starlette from services.schema import resolvers from services.rediscache import redis +from services.keeper import notification_service from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE import_module("resolvers") -schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore +schema = make_executable_schema(load_schema_from_path("notifier.graphql"), resolvers) # type: ignore async def start_up(): @@ -24,6 +26,9 @@ async def start_up(): f.write(str(os.getpid())) else: await redis.connect() + notification_service_task = asyncio.create_task(notification_service.worker()) + print(f"[main] {notification_service_task}") + try: import sentry_sdk diff --git a/notifier.graphql b/notifier.graphql new file mode 100644 index 0000000..e69de29 diff --git a/resolvers/notifications.py b/resolvers/notifications.py index 81b91e1..f8d7b82 100644 --- a/resolvers/notifications.py +++ b/resolvers/notifications.py @@ -25,11 +25,11 @@ async def load_notifications(_, info, params=None): notifications = [] with local_session() as session: - total_count = session.query(Notification).where(Notification.author == author_id).count() + total_count = session.query(Notification).where(Notification.user == user_id).count() total_unread_count = ( session.query(Notification) - .where(and_(Notification.author == author_id, Notification.seen == False)) # noqa: E712 + .where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712 .count() ) @@ -47,7 +47,7 @@ async def load_notifications(_, info, params=None): @mutation.field("markNotificationAsRead") @login_required async def mark_notification_as_read(_, info, notification_id: int): - author_id = info.context["author_id"] + user_id = info.context["user_id"] with local_session() as session: notification = ( @@ -64,8 +64,7 @@ async def mark_notification_as_read(_, info, notification_id: int): @mutation.field("markAllNotificationsAsRead") @login_required async def mark_all_notifications_as_read(_, info): - auth: AuthCredentials = info.context["request"].auth - user_id = auth.user_id + user_id = info.context["user_id"] statement = ( update(Notification) diff --git a/services/auth.py b/services/auth.py index 24234bc..e3a956a 100644 --- a/services/auth.py +++ b/services/auth.py @@ -58,7 +58,7 @@ def auth_request(f): if not is_authenticated: raise HTTPError("please, login first") else: - req["author_id"] = user_id + req["user_id"] = user_id return await f(*args, **kwargs) return decorated_function diff --git a/services/core.py b/services/core.py index 768caac..0fc4660 100644 --- a/services/core.py +++ b/services/core.py @@ -10,7 +10,7 @@ async def get_all_authors() -> List[ChatMember]: query_name = "authorsAll" query_type = "query" operation = "AuthorsAll" - query_fields = "id slug userpic name" + query_fields = "id slug pic name" gql = { "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }", @@ -38,10 +38,10 @@ async def get_my_followings() -> List[ChatMember]: query_name = "loadMySubscriptions" query_type = "query" operation = "LoadMySubscriptions" - query_fields = "id slug userpic name" + query_fields = "id slug pic name" gql = { - "query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }", + "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }", "operationName": operation, "variables": None, } @@ -60,3 +60,32 @@ async def get_my_followings() -> List[ChatMember]: traceback.print_exc() return [] + + +async def get_author(author_id) -> Author: + query_name = "getAuthor" + query_type = "query" + operation = "GetAuthor" + query_fields = "id slug pic name" + + gql = { + "query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }", + "operationName": operation, + "variables": None, + } + + async with AsyncClient() as client: + try: + response = await client.post(API_BASE, headers=headers, json=gql) + print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes") + if response.status_code != 200: + return [] + r = response.json() + if r: + return r.get("data", {}).get(query_name, {}) + else: + raise Exception("json response error") + except Exception: + import traceback + + traceback.print_exc() diff --git a/services/keeper.py b/services/keeper.py new file mode 100644 index 0000000..8721d2c --- /dev/null +++ b/services/keeper.py @@ -0,0 +1,164 @@ +import asyncio +import json +from datetime import datetime, timezone + +from sqlalchemy import and_ + +from services.db import local_session +from orm.shout import Shout +from orm.author import Author +from orm.user import User +from orm.notification import NotificationType, Notification +from orm.reaction import ReactionKind, Reaction + + +def shout_to_shout_data(shout): + return {"title": shout.title, "slug": shout.slug} + + +def user_to_user_data(user): + return {"id": user.id, "name": user.name, "slug": user.slug, "userpic": user.userpic} + + +def update_prev_notification(notification, user, reaction): + notification_data = json.loads(notification.data) + + notification_data["users"] = [u for u in notification_data["users"] if u["id"] != user.id] + notification_data["users"].append(user_to_user_data(user)) + + if notification_data["reactionIds"] is None: + notification_data["reactionIds"] = [] + notification_data["reactionIds"].append(reaction.id) + + notification.data = json.dumps(notification_data, ensure_ascii=False) + notification.seen = False + notification.occurrences = notification.occurrences + 1 + notification.createdAt = datetime.now(tz=timezone.utc) + + +class NewReactionNotificator: + def __init__(self, reaction_id): + self.reaction_id = reaction_id + + async def run(self): + with local_session() as session: + reaction = session.query(Reaction).where(Reaction.id == self.reaction_id).one() + shout = session.query(Shout).where(Shout.id == reaction.shout).one() + user = session.query(User).where(User.id == reaction.created_by).one() + notify_user_ids = [] + + if reaction.kind == ReactionKind.COMMENT: + parent_reaction = None + if reaction.replyTo: + parent_reaction = session.query(Reaction).where(Reaction.id == reaction.replyTo).one() + if parent_reaction.createdBy != reaction.createdBy: + prev_new_reply_notification = ( + session.query(Notification) + .where( + and_( + Notification.user == shout.created_by, + Notification.action == NotificationAction.CREATE, + Notification.entity == NotificationAction.REACTION, + # Notification.shout == shout.id, + # Notification.reaction == parent_reaction.id, + # TODO: filter by payload content + Notification.seen == False, # noqa: E712 + ) + ) + .first() + ) + + if prev_new_reply_notification: + update_prev_notification(prev_new_reply_notification, user, reaction) + else: + reply_notification_data = json.dumps( + { + "shout": shout_to_shout_data(shout), + "users": [user_to_user_data(user)], + "reactionIds": [reaction.id], + }, + ensure_ascii=False, + ) + + reply_notification = Notification.create( + **{ + "user": parent_reaction.created_by, + "action": NotificationAction.CREATE, + "entity": NotificationEntity.REACTION, + # TODO: filter by payload content + # "shout": shout.id, + # "reaction": parent_reaction.id, + "data": reply_notification_data, + } + ) + + session.add(reply_notification) + + notify_user_ids.append(parent_reaction.createdBy) + + if reaction.createdBy != shout.createdBy and ( + parent_reaction is None or parent_reaction.created_by != shout.created_by + ): + prev_new_comment_notification = ( + session.query(Notification) + .where( + and_( + Notification.user == shout.created_by, + Notification.action == NotificationAction.CREATE, + Notification.entity == NotificationEntity.REACTION, + Notification.seen == False, # noqa: E712 + ) + ) + .first() + ) + + if prev_new_comment_notification: + update_prev_notification(prev_new_comment_notification, user, reaction) + else: + notification_data_string = json.dumps( + { + "shout": shout_to_shout_data(shout), + "users": [user_to_user_data(user)], + "reactionIds": [reaction.id], + }, + ensure_ascii=False, + ) + + author_notification = Notification.create( + **{ + "user": shout.created_by, + "entity": NotificationEntity.REACTION, + "action": NotificationAction.CREATE, + "shout": shout.id, + "data": notification_data_string, + } + ) + + session.add(author_notification) + + notify_user_ids.append(shout.created_by) + + session.commit() + + for user_id in notify_user_ids: + await connection_manager.notify_user(user_id) + + +class NotificationService: + def __init__(self): + self._queue = asyncio.Queue(maxsize=1000) + + async def handle_reaction(self, reaction_id): + notificator = NewReactionNotificator(reaction_id) + await self._queue.put(notificator) + + async def worker(self): + while True: + notificator = await self._queue.get() + try: + await notificator.run() + except Exception as e: + print(f"[NotificationService.worker] error: {str(e)}") + + +notification_service = NotificationService() diff --git a/services/listener.py b/services/listener.py new file mode 100644 index 0000000..927d70a --- /dev/null +++ b/services/listener.py @@ -0,0 +1,43 @@ +import json +from services.rediscache import redis +from servies.notifier import notification_service + + +# Каналы для прослушивания +channels = ["reaction", "shout"] +pubsubs = [] + + +def create_notification_channel(redis_conn, channel_name): + pubsub = redis_conn.pubsub() + pubsub.subscribe(channel_name) + return pubsub + + +def close_notification_channel(pubsub): + pubsub.unsubscribe() + pubsub.close() + + +def start(): + # Подписка на каналы + pubsubs = [create_notification_channel(redis_conn, channel) for channel in channels] + + try: + # Бесконечный цикл прослушивания + while True: + for pubsub in pubsubs: + msg = pubsub.get_message() + notification_service.handle_reaction(msg["data"]) + + except Exception: + pass + finally: + # Отписка от каналов при завершении + for pubsub in pubsubs: + close_notification_channel(pubsub) + + +def stop(): + for pubsub in pubsubs: + close_notification_channel(pubsub)