From 62c8d51c5d1ec90703edfe6e3ae925124c076928 Mon Sep 17 00:00:00 2001 From: Untone Date: Sun, 26 Nov 2023 13:18:57 +0300 Subject: [PATCH] init-strawberry --- README.md | 6 +- main.py | 14 ++-- nginx.conf.sigil | 19 +---- notifier.graphql | 46 ----------- orm/user.py | 30 ------- pyproject.toml | 30 ++++--- resolvers/listener.py | 37 +++++++++ resolvers/notifications.py | 82 ------------------- resolvers/schema.py | 108 ++++++++++++++++++++++++ services/core.py | 108 ++++++++---------------- services/keeper.py | 164 ------------------------------------- services/listener.py | 43 ---------- services/schema.py | 5 -- 13 files changed, 207 insertions(+), 485 deletions(-) delete mode 100644 notifier.graphql delete mode 100644 orm/user.py create mode 100644 resolvers/listener.py delete mode 100644 resolvers/notifications.py create mode 100644 resolvers/schema.py delete mode 100644 services/keeper.py delete mode 100644 services/listener.py delete mode 100644 services/schema.py diff --git a/README.md b/README.md index a84385d..5178230 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ ### Что делает - - слушает redis PubSub каналы реакций и постов - - сохраняет уведомления - - формирует дайджесты + - сохраняет тех, кому уведомления уже были отправлены (redis: authors-online) + - формирует дайджесты для остальных + - слушает Redis PubSub канал с обновлениями реакций ### Что НЕ делает diff --git a/main.py b/main.py index 0f3461f..0484b09 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,15 @@ import os import asyncio -from importlib import import_module from os.path import exists -from ariadne import load_schema_from_path, make_executable_schema -from ariadne.asgi import GraphQL +from strawberry.asgi import GraphQL from starlette.applications import Starlette -from services.schema import resolvers from services.rediscache import redis -from services.keeper import notification_service +from resolvers.listener import start as listener_start, stop as listener_stop +from resolvers.schema import schema from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE -import_module("resolvers") -schema = make_executable_schema(load_schema_from_path("notifier.graphql"), resolvers) # type: ignore - async def start_up(): if MODE == "dev": @@ -26,7 +21,7 @@ async def start_up(): f.write(str(os.getpid())) else: await redis.connect() - notification_service_task = asyncio.create_task(notification_service.worker()) + notification_service_task = asyncio.create_task(listener_start()) print(f"[main] {notification_service_task}") try: @@ -39,6 +34,7 @@ async def start_up(): async def shutdown(): + listener_stop() await redis.disconnect() diff --git a/nginx.conf.sigil b/nginx.conf.sigil index a9d8314..765f41e 100644 --- a/nginx.conf.sigil +++ b/nginx.conf.sigil @@ -1,4 +1,4 @@ -# sigil ver 2.2 dufok 2022-10-15 (with location /connect nobuffer) +# sigil ver 2.2 dufok 2022-10-15 (with location /connect nobuffer) # Proxy settings {{ $proxy_settings := "proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $http_connection; proxy_set_header Host $http_host; proxy_set_header X-Request-Start $msec;" }} # GZIP settings @@ -54,23 +54,6 @@ server { {{ $cors_headers_get }} } - # Custom location block for /connect - location /connect/ { - proxy_pass http://presence-8080/; - add_header 'Cache-Control' 'no-cache'; - add_header 'Content-Type' 'text/event-stream'; - add_header 'Connection' 'keep-alive'; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_buffering off; - proxy_cache off; - proxy_read_timeout 36000s; - {{ $proxy_settings }} - {{ $cors_headers_options }} - {{ $cors_headers_post }} - {{ $cors_headers_get }} - } - # Error pages error_page 400 401 402 403 405 406 407 408 409 410 411 412 413 414 415 416 417 418 420 422 423 424 426 428 429 431 444 449 450 451 /400-error.html; diff --git a/notifier.graphql b/notifier.graphql deleted file mode 100644 index eee5112..0000000 --- a/notifier.graphql +++ /dev/null @@ -1,46 +0,0 @@ - -enum NotificationAction { - CREATE, - UPDATE, - DELETE, - SEEN -} -enum NotificationEntity { - SHOUT, - REACTION -} - - -type Notification { - id: Int! - action: NotificationAction! - entity: NotificationEntity! - created_at: Int! - seen: Boolean! - data: String # JSON - occurrences: Int -} - -input NotificationsQueryParams { - limit: Int - offset: Int -} - -type NotificationsQueryResult { - notifications: [Notification]! - total: Int! - unread: Int! -} - -type NotificationSeenResult { - error: String -} - -type Query { - loadNotifications(params: NotificationsQueryParams!): NotificationsQueryResult! -} - -type Mutation { - markNotificationAsRead(notification_id: Int!): NotificationSeenResult! - markAllNotificationsAsRead: NotificationSeenResult! -} diff --git a/orm/user.py b/orm/user.py deleted file mode 100644 index f673552..0000000 --- a/orm/user.py +++ /dev/null @@ -1,30 +0,0 @@ -import time - -from sqlalchemy import JSON, Boolean, Column, Integer, String - -from services.db import Base - - -class User(Base): - __tablename__ = "authorizer_users" - - id = Column(String, primary_key=True, unique=True, nullable=False, default=None) - key = Column(String) - email = Column(String, unique=True) - email_verified_at = Column(Integer) - family_name = Column(String) - gender = Column(String) - given_name = Column(String) - is_multi_factor_auth_enabled = Column(Boolean) - middle_name = Column(String) - nickname = Column(String) - password = Column(String) - phone_number = Column(String, unique=True) - phone_number_verified_at = Column(Integer) - # preferred_username = Column(String, nullable=False) - picture = Column(String) - revoked_timestamp = Column(Integer) - roles = Column(JSON) - signup_methods = Column(String, default="magic_link_login") - created_at = Column(Integer, default=lambda: int(time.time())) - updated_at = Column(Integer, default=lambda: int(time.time())) diff --git a/pyproject.toml b/pyproject.toml index 0c01ccc..ecd567f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,16 +14,14 @@ SQLAlchemy = "^2.0.22" httpx = "^0.25.0" psycopg2-binary = "^2.9.9" redis = {extras = ["hiredis"], version = "^5.0.1"} -sentry-sdk = "^1.32.0" -gql = {git = "https://github.com/graphql-python/gql.git", rev = "master"} -ariadne = {git = "https://github.com/tonyrewin/ariadne.git", rev = "master"} -starlette = {git = "https://github.com/encode/starlette.git", rev = "master"} uvicorn = "^0.24.0.post1" +strawberry-graphql = { extras = ["asgi"], version = "^0.215.1" } +sentry-sdk = "^1.37.1" [tool.poetry.dev-dependencies] pytest = "^7.4.2" black = { version = "^23.9.1", python = ">=3.12" } -ruff = { version = "^0.1.0", python = ">=3.12" } +mypy = { version = "^1.7", python = ">=3.12" } [tool.black] line-length = 120 @@ -57,12 +55,18 @@ use_parentheses = true ensure_newline_before_comments = true line_length = 120 - -[tool.ruff] -# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. -# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or -# McCabe complexity (`C901`) by default. -select = ["E4", "E7", "E9", "F"] +[tool.pyright] +include = ["orm", "resolvers"] +exclude = ["**/__pycache__"] ignore = [] -line-length = 120 -target-version = "py312" +defineConstant = { DEBUG = true } +reportMissingImports = true +reportMissingTypeStubs = false +pythonVersion = "312" +pythonPlatform = "Linux" +executionEnvironments = [] + +[tool.mypy] +python_version = "3.12" +warn_unused_configs = true +plugins = ["mypy_sqlalchemy.plugin"] diff --git a/resolvers/listener.py b/resolvers/listener.py new file mode 100644 index 0000000..3eb62c1 --- /dev/null +++ b/resolvers/listener.py @@ -0,0 +1,37 @@ +import json +from typing import List, Dict + +from orm.notification import Notification +from services.db import local_session +from services.rediscache import redis + + +def handle_reaction(notification: Dict[str, str | int | List[int]]): + """создаеёт новое хранимое уведомление""" + try: + with local_session() as session: + n = Notification(**notification) + session.add(n) + session.commit(n) + except Exception as e: + session.rollback() + print(f"[listener.handle_reaction] error: {str(e)}") + + +def stop(pubsub): + pubsub.unsubscribe() + pubsub.close() + + +def start(): + pubsub = redis.pubsub() + pubsub.subscribe("reaction") + try: + # Бесконечный цикл прослушивания + while True: + msg = pubsub.get_message() + handle_reaction(json.loads(msg["data"])) + except Exception: + pass + finally: + stop(pubsub) diff --git a/resolvers/notifications.py b/resolvers/notifications.py deleted file mode 100644 index 73bca5b..0000000 --- a/resolvers/notifications.py +++ /dev/null @@ -1,82 +0,0 @@ -from sqlalchemy import and_, desc, select, update - -from services.auth import login_required -from services.db import local_session -from services.schema import mutation, query -from orm.notification import Notification - -# TODO: occurrencies? - -# TODO: use of Author.id? - - -@query.field("loadNotifications") -@login_required -async def load_notifications(_, info, params=None): - if params is None: - params = {} - - user_id = info.context["user_id"] - - limit = params.get("limit", 50) - offset = params.get("offset", 0) - q = select(Notification).order_by(desc(Notification.created_at)).limit(limit).offset(offset) - - notifications = [] - with local_session() as session: - total_count = session.query(Notification).where(Notification.user == user_id).count() - - total_unread_count = ( - session.query(Notification) - .where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712 - .count() - ) - - for [notification] in session.execute(q): - notification.type = notification.type.name - notifications.append(notification) - - return { - "notifications": notifications, - "total": total_count, - "unread": total_unread_count, - } - - -@mutation.field("markNotificationAsRead") -@login_required -async def mark_notification_as_read(_, info, notification_id: int): - user_id = info.context["user_id"] - - with local_session() as session: - notification = ( - session.query(Notification) - .where(and_(Notification.id == notification_id, Notification.user == user_id)) - .one() - ) - notification.seen = True - session.commit() - - return {} - - -@mutation.field("markAllNotificationsAsRead") -@login_required -async def mark_all_notifications_as_read(_, info): - user_id = info.context["user_id"] - - statement = ( - update(Notification) - .where(and_(Notification.user == user_id, Notification.seen == False)) # noqa: E712 - .values(seen=True) - ) - - with local_session() as session: - try: - session.execute(statement) - session.commit() - except Exception as e: - session.rollback() - print(f"[mark_all_notifications_as_read] error: {str(e)}") - - return {} diff --git a/resolvers/schema.py b/resolvers/schema.py new file mode 100644 index 0000000..9e8fa98 --- /dev/null +++ b/resolvers/schema.py @@ -0,0 +1,108 @@ +import strawberry +from sqlalchemy import and_ +from orm.author import Author +from services.auth import login_required +from services.db import local_session + + +@strawberry.type +class NotificationSeen: + notification: int # notification id + viewer: int # author id + + +@strawberry.type +class Notification: + id: int + action: str # create update delete join follow etc. + entity: str # REACTION SHOUT + created_at: int + seen: list[NotificationSeen] + data: str # JSON data + occurrences: int + + +@strawberry.type +class NotificationsQueryResult: + notifications: list[Notification] + unread: int + + +@strawberry.type +class NotificationSeenResult: + error: str + + +def notification_seen_by_viewer(viewer_id, notification_id, session): + seen = ( + session.query(NotificationSeen) + .filter(NotificationSeen.viewer == viewer_id, NotificationSeen.notification == notification_id) + .first() + ) + + return seen is not None + + +@strawberry.type +class Query: + @login_required + @strawberry.field + async def load_notifications(self, info, limit: int = 50, offset: int = 0) -> dict: + """непрочитанные уведомления""" + user_id = info.context["user_id"] + + with local_session() as session: + author = session.query(Author).filter(Author.user == user_id).first() + + nslist = ( + session.query(Notification) + .outerjoin( + NotificationSeen, + and_(NotificationSeen.viewer == author.id, NotificationSeen.notification == Notification.id), + ) + .limit(limit) + .offset(offset) + .all() + ) + + for notification in nslist: + notification.seen_by_viewer = notification_seen_by_viewer(author.id, notification.id, session) + + unread = sum(1 for n in nslist if not n.seen_by_viewer) + + return {"notifications": nslist, "unread": unread} + + +@strawberry.type +class Mutation: + @strawberry.mutation + @login_required + async def mark_notification_as_read(self, info, notification_id: int) -> NotificationSeenResult: + user_id = info.context["user_id"] + try: + with local_session() as session: + author = session.query(Author).filter(Author.user == user_id).first() + ns = NotificationSeen({"notification": notification_id, "viewer": author.id}) + session.add(ns) + session.commit() + except Exception as e: + session.rollback() + print(f"[mark_notification_as_read] error: {str(e)}") + return {} + + @strawberry.mutation + @login_required + async def mark_all_notifications_as_read(self, info) -> NotificationSeenResult: + user_id = info.context["user_id"] + + with local_session() as session: + try: + author = session.query(Author).filter(Author.user == user_id).first() + _nslist = session.quuery(NotificationSeen).filter(NotificationSeen.viewer == author.id).all() + except Exception as e: + session.rollback() + print(f"[mark_all_notifications_as_read] error: {str(e)}") + return {} + + +schema = strawberry.Schema(query=Query, mutation=Mutation) diff --git a/services/core.py b/services/core.py index 0fc4660..e5dd779 100644 --- a/services/core.py +++ b/services/core.py @@ -1,82 +1,18 @@ -from httpx import AsyncClient -from settings import API_BASE from typing import List -from models.member import ChatMember + +from httpx import AsyncClient + +from orm.author import Author +from orm.shout import Shout +from settings import API_BASE headers = {"Content-Type": "application/json"} -async def get_all_authors() -> List[ChatMember]: - query_name = "authorsAll" - query_type = "query" - operation = "AuthorsAll" - query_fields = "id slug pic name" - - gql = { - "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }", - "operationName": operation, - "variables": None, - } - +async def _request_endpoint(query_name, body): async with AsyncClient() as client: try: - response = await client.post(API_BASE, headers=headers, json=gql) - print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes") - if response.status_code != 200: - return [] - r = response.json() - if r: - return r.get("data", {}).get(query_name, []) - except Exception: - import traceback - - traceback.print_exc() - return [] - - -async def get_my_followings() -> List[ChatMember]: - query_name = "loadMySubscriptions" - query_type = "query" - operation = "LoadMySubscriptions" - query_fields = "id slug pic name" - - gql = { - "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + " } " + " }", - "operationName": operation, - "variables": None, - } - - async with AsyncClient() as client: - try: - response = await client.post(API_BASE, headers=headers, json=gql) - print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes") - if response.status_code != 200: - return [] - r = response.json() - if r: - return r.get("data", {}).get(query_name, {}).get("authors", []) - except Exception: - import traceback - - traceback.print_exc() - return [] - - -async def get_author(author_id) -> Author: - query_name = "getAuthor" - query_type = "query" - operation = "GetAuthor" - query_fields = "id slug pic name" - - gql = { - "query": query_type + " " + operation + " { " + query_name + " { authors {" + query_fields + "} } " + " }", - "operationName": operation, - "variables": None, - } - - async with AsyncClient() as client: - try: - response = await client.post(API_BASE, headers=headers, json=gql) + response = await client.post(API_BASE, headers=headers, json=body) print(f"[services.core] {query_name}: [{response.status_code}] {len(response.text)} bytes") if response.status_code != 200: return [] @@ -89,3 +25,31 @@ async def get_author(author_id) -> Author: import traceback traceback.print_exc() + + +async def get_author(author_id) -> Author: + query_name = "getAuthor" + query_type = "query" + operation = "GetAuthor" + query_fields = "id slug pic name" + + gql = { + "query": query_type + " " + operation + " { " + query_name + " { " + query_fields + "м} " + " }", + "operationName": operation, + "variables": None, + } + + return await _request_endpoint(query_name, gql) + + +async def get_followed_shouts(author_id: int) -> List[Shout]: + query_name = "getFollowedShouts" + query_type = "query" + operation = "GetFollowedShouts" + query_fields = "id slug title" + + query = f"{query_type} {operation}($author_id: Int!) {{ {query_name}(author_id: $author_id) {{ {query_fields} }} }}" + + body = {"query": query, "operationName": operation, "variables": {"author_id": author_id}} + + return await _request_endpoint(query_name, body) diff --git a/services/keeper.py b/services/keeper.py deleted file mode 100644 index 3b658b1..0000000 --- a/services/keeper.py +++ /dev/null @@ -1,164 +0,0 @@ -import asyncio -import json -from datetime import datetime, timezone - -from sqlalchemy import and_ - -from services.db import local_session -from orm.shout import Shout -from orm.author import Author -from orm.user import User -from orm.notification import NotificationAction, NotificationEntity, Notification -from orm.reaction import ReactionKind, Reaction - - -def shout_to_shout_data(shout): - return {"title": shout.title, "slug": shout.slug} - - -def user_to_user_data(user): - return {"id": user.id, "name": user.name, "slug": user.slug, "userpic": user.userpic} - - -def update_prev_notification(notification, user, reaction): - notification_data = json.loads(notification.data) - - notification_data["users"] = [u for u in notification_data["users"] if u["id"] != user.id] - notification_data["users"].append(user_to_user_data(user)) - - if notification_data["reactionIds"] is None: - notification_data["reactionIds"] = [] - notification_data["reactionIds"].append(reaction.id) - - notification.data = json.dumps(notification_data, ensure_ascii=False) - notification.seen = False - notification.occurrences = notification.occurrences + 1 - notification.createdAt = datetime.now(tz=timezone.utc) - - -class NewReactionNotificator: - def __init__(self, reaction_id): - self.reaction_id = reaction_id - - async def run(self): - with local_session() as session: - reaction = session.query(Reaction).where(Reaction.id == self.reaction_id).one() - shout = session.query(Shout).where(Shout.id == reaction.shout).one() - user = session.query(User).where(User.id == reaction.created_by).one() - notify_user_ids = [] - - if reaction.kind == ReactionKind.COMMENT: - parent_reaction = None - if reaction.replyTo: - parent_reaction = session.query(Reaction).where(Reaction.id == reaction.replyTo).one() - if parent_reaction.createdBy != reaction.createdBy: - prev_new_reply_notification = ( - session.query(Notification) - .where( - and_( - Notification.user == shout.created_by, - Notification.action == NotificationAction.CREATE, - Notification.entity == NotificationAction.REACTION, - # Notification.shout == shout.id, - # Notification.reaction == parent_reaction.id, - # TODO: filter by payload content - Notification.seen == False, # noqa: E712 - ) - ) - .first() - ) - - if prev_new_reply_notification: - update_prev_notification(prev_new_reply_notification, user, reaction) - else: - reply_notification_data = json.dumps( - { - "shout": shout_to_shout_data(shout), - "users": [user_to_user_data(user)], - "reactionIds": [reaction.id], - }, - ensure_ascii=False, - ) - - reply_notification = Notification.create( - **{ - "user": parent_reaction.created_by, - "action": NotificationAction.CREATE, - "entity": NotificationEntity.REACTION, - # TODO: filter by payload content - # "shout": shout.id, - # "reaction": parent_reaction.id, - "data": reply_notification_data, - } - ) - - session.add(reply_notification) - - notify_user_ids.append(parent_reaction.createdBy) - - if reaction.createdBy != shout.createdBy and ( - parent_reaction is None or parent_reaction.created_by != shout.created_by - ): - prev_new_comment_notification = ( - session.query(Notification) - .where( - and_( - Notification.user == shout.created_by, - Notification.action == NotificationAction.CREATE, - Notification.entity == NotificationEntity.REACTION, - Notification.seen == False, # noqa: E712 - ) - ) - .first() - ) - - if prev_new_comment_notification: - update_prev_notification(prev_new_comment_notification, user, reaction) - else: - notification_data_string = json.dumps( - { - "shout": shout_to_shout_data(shout), - "users": [user_to_user_data(user)], - "reactionIds": [reaction.id], - }, - ensure_ascii=False, - ) - - author_notification = Notification.create( - **{ - "user": shout.created_by, - "entity": NotificationEntity.REACTION, - "action": NotificationAction.CREATE, - "shout": shout.id, - "data": notification_data_string, - } - ) - - session.add(author_notification) - - notify_user_ids.append(shout.created_by) - - session.commit() - - for user_id in notify_user_ids: - await connection_manager.notify_user(user_id) - - -class NotificationService: - def __init__(self): - self._queue = asyncio.Queue(maxsize=1000) - - async def handle_reaction(self, reaction_id): - notificator = NewReactionNotificator(reaction_id) - await self._queue.put(notificator) - - async def worker(self): - while True: - notificator = await self._queue.get() - try: - await notificator.run() - except Exception as e: - print(f"[NotificationService.worker] error: {str(e)}") - - -notification_service = NotificationService() diff --git a/services/listener.py b/services/listener.py deleted file mode 100644 index 927d70a..0000000 --- a/services/listener.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -from services.rediscache import redis -from servies.notifier import notification_service - - -# Каналы для прослушивания -channels = ["reaction", "shout"] -pubsubs = [] - - -def create_notification_channel(redis_conn, channel_name): - pubsub = redis_conn.pubsub() - pubsub.subscribe(channel_name) - return pubsub - - -def close_notification_channel(pubsub): - pubsub.unsubscribe() - pubsub.close() - - -def start(): - # Подписка на каналы - pubsubs = [create_notification_channel(redis_conn, channel) for channel in channels] - - try: - # Бесконечный цикл прослушивания - while True: - for pubsub in pubsubs: - msg = pubsub.get_message() - notification_service.handle_reaction(msg["data"]) - - except Exception: - pass - finally: - # Отписка от каналов при завершении - for pubsub in pubsubs: - close_notification_channel(pubsub) - - -def stop(): - for pubsub in pubsubs: - close_notification_channel(pubsub) diff --git a/services/schema.py b/services/schema.py deleted file mode 100644 index 8bd983e..0000000 --- a/services/schema.py +++ /dev/null @@ -1,5 +0,0 @@ -from ariadne import QueryType, MutationType - -query = QueryType() -mutation = MutationType() -resolvers = [query, mutation]