From deac939ed8f110bb61c41dcd24fa0882e9880246 Mon Sep 17 00:00:00 2001 From: Tony Rewin Date: Thu, 5 Oct 2023 21:46:18 +0300 Subject: [PATCH] restructured,inbox-removed --- alembic/env.py | 7 +- auth/authenticate.py | 44 ++--- auth/identity.py | 33 ++-- auth/jwtcodec.py | 22 +-- auth/tokenstorage.py | 8 +- base/exceptions.py | 38 ---- base/redis.py | 44 ----- main.py | 47 ++--- migration/tables/comments.py | 84 ++++----- migration/tables/content_items.py | 117 +++++++----- migration/tables/remarks.py | 38 ++-- migration/tables/topics.py | 4 +- migration/tables/users.py | 23 ++- orm/__init__.py | 2 +- orm/collection.py | 2 +- orm/community.py | 8 +- orm/notification.py | 2 +- orm/rbac.py | 13 +- orm/reaction.py | 14 +- orm/shout.py | 10 +- orm/topic.py | 6 +- orm/user.py | 2 +- resolvers/__init__.py | 79 ++------ resolvers/auth.py | 41 ++-- resolvers/{create => }/editor.py | 89 +++++---- resolvers/{zine => }/following.py | 70 ++++--- resolvers/inbox/chats.py | 124 ------------- resolvers/inbox/load.py | 152 --------------- resolvers/inbox/messages.py | 179 ------------------ resolvers/inbox/search.py | 95 ---------- resolvers/inbox/unread.py | 22 --- resolvers/{zine => }/load.py | 185 +++++++++++-------- resolvers/{create => }/migrate.py | 3 +- resolvers/{zine => }/profile.py | 109 +++++------ resolvers/{zine => }/reactions.py | 226 ++++++++++++----------- resolvers/{zine => }/topics.py | 35 ++-- {services/stat => schemas}/ackee.graphql | 0 schema.graphql => schemas/core.graphql | 87 --------- base/orm.py => services/db.py | 0 services/exceptions.py | 39 ++++ services/inbox/presence.py | 46 ----- services/inbox/sse.py | 22 --- services/main.py | 12 +- services/presence.py | 35 ++++ services/redis.py | 58 ++++++ base/resolvers.py => services/schema.py | 0 services/search.py | 10 +- services/{stat => }/viewed.py | 79 ++++---- services/zine/gittask.py | 70 ------- 49 files changed, 886 insertions(+), 1549 deletions(-) delete mode 100644 base/exceptions.py delete mode 100644 base/redis.py rename resolvers/{create => }/editor.py (62%) rename resolvers/{zine => }/following.py (66%) delete mode 100644 resolvers/inbox/chats.py delete mode 100644 resolvers/inbox/load.py delete mode 100644 resolvers/inbox/messages.py delete mode 100644 resolvers/inbox/search.py delete mode 100644 resolvers/inbox/unread.py rename resolvers/{zine => }/load.py (55%) rename resolvers/{create => }/migrate.py (86%) rename resolvers/{zine => }/profile.py (74%) rename resolvers/{zine => }/reactions.py (63%) rename resolvers/{zine => }/topics.py (81%) rename {services/stat => schemas}/ackee.graphql (100%) rename schema.graphql => schemas/core.graphql (79%) rename base/orm.py => services/db.py (100%) create mode 100644 services/exceptions.py delete mode 100644 services/inbox/presence.py delete mode 100644 services/inbox/sse.py create mode 100644 services/presence.py create mode 100644 services/redis.py rename base/resolvers.py => services/schema.py (100%) rename services/{stat => }/viewed.py (76%) delete mode 100644 services/zine/gittask.py diff --git a/alembic/env.py b/alembic/env.py index c6d69a97..128945e0 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -19,7 +19,8 @@ config.set_section_option(config.config_ini_section, "DB_URL", DB_URL) if config.config_file_name is not None: fileConfig(config.config_file_name) -from base.orm import Base +from services.db import Base + target_metadata = [Base.metadata] # other values from the config, defined by the needs of env.py, @@ -66,9 +67,7 @@ def run_migrations_online() -> None: ) with connectable.connect() as connection: - context.configure( - connection=connection, target_metadata=target_metadata - ) + context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() diff --git a/auth/authenticate.py b/auth/authenticate.py index be4db2d2..0574c08f 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -7,57 +7,59 @@ from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection from auth.credentials import AuthCredentials, AuthUser -from base.orm import local_session +from services.db import local_session from orm.user import User, Role from settings import SESSION_TOKEN_HEADER from auth.tokenstorage import SessionToken -from base.exceptions import OperationNotAllowed +from services.exceptions import OperationNotAllowed class JWTAuthenticate(AuthenticationBackend): async def authenticate( self, request: HTTPConnection ) -> Optional[Tuple[AuthCredentials, AuthUser]]: - if SESSION_TOKEN_HEADER not in request.headers: - return AuthCredentials(scopes={}), AuthUser(user_id=None, username='') + return AuthCredentials(scopes={}), AuthUser(user_id=None, username="") token = request.headers.get(SESSION_TOKEN_HEADER) if not token: print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER) return AuthCredentials(scopes={}, error_message=str("no token")), AuthUser( - user_id=None, username='' + user_id=None, username="" ) - if len(token.split('.')) > 1: + if len(token.split(".")) > 1: payload = await SessionToken.verify(token) with local_session() as session: try: user = ( - session.query(User).options( - joinedload(User.roles).options(joinedload(Role.permissions)), - joinedload(User.ratings) - ).filter( - User.id == payload.user_id - ).one() + session.query(User) + .options( + joinedload(User.roles).options( + joinedload(Role.permissions) + ), + joinedload(User.ratings), + ) + .filter(User.id == payload.user_id) + .one() ) scopes = {} # TODO: integrate await user.get_permission() return ( AuthCredentials( - user_id=payload.user_id, - scopes=scopes, - logged_in=True + user_id=payload.user_id, scopes=scopes, logged_in=True ), - AuthUser(user_id=user.id, username=''), + AuthUser(user_id=user.id, username=""), ) except exc.NoResultFound: pass - return AuthCredentials(scopes={}, error_message=str('Invalid token')), AuthUser(user_id=None, username='') + return AuthCredentials(scopes={}, error_message=str("Invalid token")), AuthUser( + user_id=None, username="" + ) def login_required(func): @@ -68,9 +70,7 @@ def login_required(func): # print(auth) if not auth or not auth.logged_in: # raise Unauthorized(auth.error_message or "Please login") - return { - "error": "Please login first" - } + return {"error": "Please login first"} return await func(parent, info, *args, **kwargs) return wrap @@ -79,7 +79,9 @@ def login_required(func): def permission_required(resource, operation, func): @wraps(func) async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs): - print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only + print( + "[auth.authenticate] permission_required for %r with info %r" % (func, info) + ) # debug only auth: AuthCredentials = info.context["request"].auth if not auth.logged_in: raise OperationNotAllowed(auth.error_message or "Please login") diff --git a/auth/identity.py b/auth/identity.py index e4b78040..0fff8c96 100644 --- a/auth/identity.py +++ b/auth/identity.py @@ -7,8 +7,9 @@ from sqlalchemy import or_ from auth.jwtcodec import JWTCodec from auth.tokenstorage import TokenStorage + # from base.exceptions import InvalidPassword, InvalidToken -from base.orm import local_session +from services.db import local_session from orm import User from validations.auth import AuthInput @@ -57,14 +58,10 @@ class Identity: user = User(**orm_user.dict()) if not user.password: # raise InvalidPassword("User password is empty") - return { - "error": "User password is empty" - } + return {"error": "User password is empty"} if not Password.verify(password, user.password): # raise InvalidPassword("Wrong user password") - return { - "error": "Wrong user password" - } + return {"error": "Wrong user password"} return user @staticmethod @@ -87,30 +84,24 @@ class Identity: @staticmethod async def onetime(token: str) -> User: try: - print('[auth.identity] using one time token') + print("[auth.identity] using one time token") payload = JWTCodec.decode(token) - if not await TokenStorage.exist(f"{payload.user_id}-{payload.username}-{token}"): + if not await TokenStorage.exist( + f"{payload.user_id}-{payload.username}-{token}" + ): # raise InvalidToken("Login token has expired, please login again") - return { - "error": "Token has expired" - } + return {"error": "Token has expired"} except ExpiredSignatureError: # raise InvalidToken("Login token has expired, please try again") - return { - "error": "Token has expired" - } + return {"error": "Token has expired"} except DecodeError: # raise InvalidToken("token format error") from e - return { - "error": "Token format error" - } + return {"error": "Token format error"} with local_session() as session: user = session.query(User).filter_by(id=payload.user_id).first() if not user: # raise Exception("user not exist") - return { - "error": "User does not exist" - } + return {"error": "User does not exist"} if not user.emailConfirmed: user.emailConfirmed = True session.commit() diff --git a/auth/jwtcodec.py b/auth/jwtcodec.py index d4d2116f..fb15b5f7 100644 --- a/auth/jwtcodec.py +++ b/auth/jwtcodec.py @@ -1,6 +1,6 @@ from datetime import datetime, timezone import jwt -from base.exceptions import ExpiredToken, InvalidToken +from services.exceptions import ExpiredToken, InvalidToken from validations.auth import TokenPayload, AuthInput from settings import JWT_ALGORITHM, JWT_SECRET_KEY @@ -13,12 +13,12 @@ class JWTCodec: "username": user.email or user.phone, "exp": exp, "iat": datetime.now(tz=timezone.utc), - "iss": "discours" + "iss": "discours", } try: return jwt.encode(payload, JWT_SECRET_KEY, JWT_ALGORITHM) except Exception as e: - print('[auth.jwtcodec] JWT encode error %r' % e) + print("[auth.jwtcodec] JWT encode error %r" % e) @staticmethod def decode(token: str, verify_exp: bool = True) -> TokenPayload: @@ -33,18 +33,18 @@ class JWTCodec: # "verify_signature": False }, algorithms=[JWT_ALGORITHM], - issuer="discours" + issuer="discours", ) r = TokenPayload(**payload) - print('[auth.jwtcodec] debug token %r' % r) + print("[auth.jwtcodec] debug token %r" % r) return r except jwt.InvalidIssuedAtError: - print('[auth.jwtcodec] invalid issued at: %r' % payload) - raise ExpiredToken('check token issued time') + print("[auth.jwtcodec] invalid issued at: %r" % payload) + raise ExpiredToken("check token issued time") except jwt.ExpiredSignatureError: - print('[auth.jwtcodec] expired signature %r' % payload) - raise ExpiredToken('check token lifetime') + print("[auth.jwtcodec] expired signature %r" % payload) + raise ExpiredToken("check token lifetime") except jwt.InvalidTokenError: - raise InvalidToken('token is not valid') + raise InvalidToken("token is not valid") except jwt.InvalidSignatureError: - raise InvalidToken('token is not valid') + raise InvalidToken("token is not valid") diff --git a/auth/tokenstorage.py b/auth/tokenstorage.py index c61aa848..4c28ffa3 100644 --- a/auth/tokenstorage.py +++ b/auth/tokenstorage.py @@ -2,14 +2,16 @@ from datetime import datetime, timedelta, timezone from auth.jwtcodec import JWTCodec from validations.auth import AuthInput -from base.redis import redis +from services.redis import redis from settings import SESSION_TOKEN_LIFE_SPAN, ONETIME_TOKEN_LIFE_SPAN async def save(token_key, life_span, auto_delete=True): await redis.execute("SET", token_key, "True") if auto_delete: - expire_at = (datetime.now(tz=timezone.utc) + timedelta(seconds=life_span)).timestamp() + expire_at = ( + datetime.now(tz=timezone.utc) + timedelta(seconds=life_span) + ).timestamp() await redis.execute("EXPIREAT", token_key, int(expire_at)) @@ -35,7 +37,7 @@ class SessionToken: class TokenStorage: @staticmethod async def get(token_key): - print('[tokenstorage.get] ' + token_key) + print("[tokenstorage.get] " + token_key) # 2041-user@domain.zn-eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyMDQxLCJ1c2VybmFtZSI6ImFudG9uLnJld2luK3Rlc3QtbG9hZGNoYXRAZ21haWwuY29tIiwiZXhwIjoxNjcxNzgwNjE2LCJpYXQiOjE2NjkxODg2MTYsImlzcyI6ImRpc2NvdXJzIn0.Nml4oV6iMjMmc6xwM7lTKEZJKBXvJFEIZ-Up1C1rITQ return await redis.execute("GET", token_key) diff --git a/base/exceptions.py b/base/exceptions.py deleted file mode 100644 index 1f3344e7..00000000 --- a/base/exceptions.py +++ /dev/null @@ -1,38 +0,0 @@ -from graphql.error import GraphQLError - - -# TODO: remove traceback from logs for defined exceptions - -class BaseHttpException(GraphQLError): - code = 500 - message = "500 Server error" - - -class ExpiredToken(BaseHttpException): - code = 401 - message = "401 Expired Token" - - -class InvalidToken(BaseHttpException): - code = 401 - message = "401 Invalid Token" - - -class Unauthorized(BaseHttpException): - code = 401 - message = "401 Unauthorized" - - -class ObjectNotExist(BaseHttpException): - code = 404 - message = "404 Object Does Not Exist" - - -class OperationNotAllowed(BaseHttpException): - code = 403 - message = "403 Operation Is Not Allowed" - - -class InvalidPassword(BaseHttpException): - code = 403 - message = "403 Invalid Password" diff --git a/base/redis.py b/base/redis.py deleted file mode 100644 index e1a4b903..00000000 --- a/base/redis.py +++ /dev/null @@ -1,44 +0,0 @@ -from aioredis import from_url -from asyncio import sleep -from settings import REDIS_URL - - -class RedisCache: - def __init__(self, uri=REDIS_URL): - self._uri: str = uri - self._instance = None - - async def connect(self): - if self._instance is not None: - return - self._instance = await from_url(self._uri, encoding="utf-8") - # print(self._instance) - - async def disconnect(self): - if self._instance is None: - return - await self._instance.close() - # await self._instance.wait_closed() # deprecated - self._instance = None - - async def execute(self, command, *args, **kwargs): - while not self._instance: - await sleep(1) - try: - print("[redis] " + command + ' ' + ' '.join(args)) - return await self._instance.execute_command(command, *args, **kwargs) - except Exception: - pass - - async def lrange(self, key, start, stop): - print(f"[redis] LRANGE {key} {start} {stop}") - return await self._instance.lrange(key, start, stop) - - async def mget(self, key, *keys): - print(f"[redis] MGET {key} {keys}") - return await self._instance.mget(key, *keys) - - -redis = RedisCache() - -__all__ = ["redis"] diff --git a/main.py b/main.py index 5b3e5c49..be2c9f93 100644 --- a/main.py +++ b/main.py @@ -13,21 +13,16 @@ from orm import init_tables from auth.authenticate import JWTAuthenticate from auth.oauth import oauth_login, oauth_authorize -from base.redis import redis -from base.resolvers import resolvers +from services.redis import redis +from services.schema import resolvers from resolvers.auth import confirm_email_handler from resolvers.upload import upload_handler from services.main import storages_init -from services.stat.viewed import ViewedStorage -from services.zine.gittask import GitTask +from services.viewed import ViewedStorage from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN -# from sse.transport import GraphQLSSEHandler -from services.inbox.presence import on_connect, on_disconnect -# from services.inbox.sse import sse_messages -from ariadne.asgi.handlers import GraphQLTransportWSHandler import_module("resolvers") -schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore +schema = make_executable_schema(load_schema_from_path("schemas/core.graphql"), resolvers) # type: ignore middleware = [ Middleware(AuthenticationMiddleware, backend=JWTAuthenticate()), @@ -41,13 +36,12 @@ async def start_up(): await storages_init() views_stat_task = asyncio.create_task(ViewedStorage().worker()) print(views_stat_task) - git_task = asyncio.create_task(GitTask.git_task_worker()) - print(git_task) try: import sentry_sdk + sentry_sdk.init(SENTRY_DSN) except Exception as e: - print('[sentry] init error') + print("[sentry] init error") print(e) @@ -56,7 +50,7 @@ async def dev_start_up(): await redis.connect() return else: - with open(DEV_SERVER_PID_FILE_NAME, 'w', encoding='utf-8') as f: + with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: f.write(str(os.getpid())) await start_up() @@ -67,11 +61,10 @@ async def shutdown(): routes = [ - # Route("/messages", endpoint=sse_messages), Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), Route("/confirm/{token}", endpoint=confirm_email_handler), - Route("/upload", endpoint=upload_handler, methods=['POST']) + Route("/upload", endpoint=upload_handler, methods=["POST"]), ] app = Starlette( @@ -81,14 +74,10 @@ app = Starlette( middleware=middleware, routes=routes, ) -app.mount("/", GraphQL( - schema, - debug=True, - websocket_handler=GraphQLTransportWSHandler( - on_connect=on_connect, - on_disconnect=on_disconnect - ) -)) +app.mount( + "/", + GraphQL(schema, debug=True), +) dev_app = app = Starlette( debug=True, @@ -97,11 +86,7 @@ dev_app = app = Starlette( middleware=middleware, routes=routes, ) -dev_app.mount("/", GraphQL( - schema, - debug=True, - websocket_handler=GraphQLTransportWSHandler( - on_connect=on_connect, - on_disconnect=on_disconnect - ) -)) +dev_app.mount( + "/", + GraphQL(schema, debug=True), +) diff --git a/migration/tables/comments.py b/migration/tables/comments.py index 82e32924..d5f35c20 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone from dateutil.parser import parse as date_parse -from base.orm import local_session +from services.db import local_session from migration.html2text import html2text from orm.reaction import Reaction, ReactionKind from orm.shout import ShoutReactionsFollower @@ -15,34 +15,28 @@ ts = datetime.now(tz=timezone.utc) def auto_followers(session, topics, reaction_dict): # creating shout's reactions following for reaction author - following1 = session.query( - ShoutReactionsFollower - ).where( - ShoutReactionsFollower.follower == reaction_dict["createdBy"] - ).filter( - ShoutReactionsFollower.shout == reaction_dict["shout"] - ).first() + following1 = ( + session.query(ShoutReactionsFollower) + .where(ShoutReactionsFollower.follower == reaction_dict["createdBy"]) + .filter(ShoutReactionsFollower.shout == reaction_dict["shout"]) + .first() + ) if not following1: following1 = ShoutReactionsFollower.create( - follower=reaction_dict["createdBy"], - shout=reaction_dict["shout"], - auto=True + follower=reaction_dict["createdBy"], shout=reaction_dict["shout"], auto=True ) session.add(following1) # creating topics followings for reaction author for t in topics: - tf = session.query( - TopicFollower - ).where( - TopicFollower.follower == reaction_dict["createdBy"] - ).filter( - TopicFollower.topic == t['id'] - ).first() + tf = ( + session.query(TopicFollower) + .where(TopicFollower.follower == reaction_dict["createdBy"]) + .filter(TopicFollower.topic == t["id"]) + .first() + ) if not tf: topic_following = TopicFollower.create( - follower=reaction_dict["createdBy"], - topic=t['id'], - auto=True + follower=reaction_dict["createdBy"], topic=t["id"], auto=True ) session.add(topic_following) @@ -68,18 +62,15 @@ def migrate_ratings(session, entry, reaction_dict): try: # creating reaction from old rating rr = Reaction.create(**re_reaction_dict) - following2 = session.query( - ShoutReactionsFollower - ).where( - ShoutReactionsFollower.follower == re_reaction_dict['createdBy'] - ).filter( - ShoutReactionsFollower.shout == rr.shout - ).first() + following2 = ( + session.query(ShoutReactionsFollower) + .where(ShoutReactionsFollower.follower == re_reaction_dict["createdBy"]) + .filter(ShoutReactionsFollower.shout == rr.shout) + .first() + ) if not following2: following2 = ShoutReactionsFollower.create( - follower=re_reaction_dict['createdBy'], - shout=rr.shout, - auto=True + follower=re_reaction_dict["createdBy"], shout=rr.shout, auto=True ) session.add(following2) session.add(rr) @@ -150,9 +141,11 @@ async def migrate(entry, storage): else: stage = "author and old id found" try: - shout = session.query( - Shout - ).where(Shout.slug == old_shout["slug"]).one() + shout = ( + session.query(Shout) + .where(Shout.slug == old_shout["slug"]) + .one() + ) if shout: reaction_dict["shout"] = shout.id reaction_dict["createdBy"] = author.id if author else 1 @@ -178,9 +171,9 @@ async def migrate(entry, storage): def migrate_2stage(old_comment, idmap): - if old_comment.get('body'): - new_id = idmap.get(old_comment.get('oid')) - new_id = idmap.get(old_comment.get('_id')) + if old_comment.get("body"): + new_id = idmap.get(old_comment.get("oid")) + new_id = idmap.get(old_comment.get("_id")) if new_id: new_replyto_id = None old_replyto_id = old_comment.get("replyTo") @@ -190,17 +183,22 @@ def migrate_2stage(old_comment, idmap): comment = session.query(Reaction).where(Reaction.id == new_id).first() try: if new_replyto_id: - new_reply = session.query(Reaction).where(Reaction.id == new_replyto_id).first() + new_reply = ( + session.query(Reaction) + .where(Reaction.id == new_replyto_id) + .first() + ) if not new_reply: print(new_replyto_id) raise Exception("cannot find reply by id!") comment.replyTo = new_reply.id session.add(comment) - srf = session.query(ShoutReactionsFollower).where( - ShoutReactionsFollower.shout == comment.shout - ).filter( - ShoutReactionsFollower.follower == comment.createdBy - ).first() + srf = ( + session.query(ShoutReactionsFollower) + .where(ShoutReactionsFollower.shout == comment.shout) + .filter(ShoutReactionsFollower.follower == comment.createdBy) + .first() + ) if not srf: srf = ShoutReactionsFollower.create( shout=comment.shout, follower=comment.createdBy, auto=True diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index 2e74f96e..ec263fc6 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -3,13 +3,13 @@ import json from dateutil.parser import parse as date_parse from sqlalchemy.exc import IntegrityError from transliterate import translit -from base.orm import local_session +from services.db import local_session from migration.extract import extract_html, extract_media from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.user import User from orm.topic import TopicFollower, Topic -from services.stat.viewed import ViewedStorage +from services.viewed import ViewedStorage import re OLD_DATE = "2016-03-05 22:22:00.350000" @@ -33,7 +33,7 @@ def get_shout_slug(entry): slug = friend.get("slug", "") if slug: break - slug = re.sub('[^0-9a-zA-Z]+', '-', slug) + slug = re.sub("[^0-9a-zA-Z]+", "-", slug) return slug @@ -41,28 +41,30 @@ def create_author_from_app(app): user = None userdata = None # check if email is used - if app['email']: + if app["email"]: with local_session() as session: - user = session.query(User).where(User.email == app['email']).first() + user = session.query(User).where(User.email == app["email"]).first() if not user: # print('[migration] app %r' % app) - name = app.get('name') + name = app.get("name") if name: slug = translit(name, "ru", reversed=True).lower() - slug = re.sub('[^0-9a-zA-Z]+', '-', slug) - print('[migration] created slug %s' % slug) + slug = re.sub("[^0-9a-zA-Z]+", "-", slug) + print("[migration] created slug %s" % slug) # check if slug is used if slug: user = session.query(User).where(User.slug == slug).first() # get slug from email if user: - slug = app['email'].split('@')[0] + slug = app["email"].split("@")[0] user = session.query(User).where(User.slug == slug).first() # one more try if user: - slug += '-author' - user = session.query(User).where(User.slug == slug).first() + slug += "-author" + user = ( + session.query(User).where(User.slug == slug).first() + ) # create user with application data if not user: @@ -80,7 +82,7 @@ def create_author_from_app(app): user = User.create(**userdata) session.add(user) session.commit() - userdata['id'] = user.id + userdata["id"] = user.id userdata = user.dict() return userdata @@ -92,13 +94,16 @@ async def create_shout(shout_dict): s = Shout.create(**shout_dict) author = s.authors[0] with local_session() as session: - srf = session.query(ShoutReactionsFollower).where( - ShoutReactionsFollower.shout == s.id - ).filter( - ShoutReactionsFollower.follower == author.id - ).first() + srf = ( + session.query(ShoutReactionsFollower) + .where(ShoutReactionsFollower.shout == s.id) + .filter(ShoutReactionsFollower.follower == author.id) + .first() + ) if not srf: - srf = ShoutReactionsFollower.create(shout=s.id, follower=author.id, auto=True) + srf = ShoutReactionsFollower.create( + shout=s.id, follower=author.id, auto=True + ) session.add(srf) session.commit() return s @@ -117,14 +122,14 @@ async def get_user(entry, storage): elif user_oid: userdata = storage["users"]["by_oid"].get(user_oid) if not userdata: - print('no userdata by oid, anonymous') + print("no userdata by oid, anonymous") userdata = anondict print(app) # cleanup slug if userdata: slug = userdata.get("slug", "") if slug: - slug = re.sub('[^0-9a-zA-Z]+', '-', slug) + slug = re.sub("[^0-9a-zA-Z]+", "-", slug) userdata["slug"] = slug else: userdata = anondict @@ -138,23 +143,30 @@ async def migrate(entry, storage): r = { "layout": type2layout[entry["type"]], "title": entry["title"], - "authors": [author, ], + "authors": [ + author, + ], "slug": get_shout_slug(entry), "cover": ( - "https://assets.discours.io/unsafe/1600x/" + - entry["thumborId"] if entry.get("thumborId") else entry.get("image", {}).get("url") + "https://assets.discours.io/unsafe/1600x/" + entry["thumborId"] + if entry.get("thumborId") + else entry.get("image", {}).get("url") ), "visibility": "public" if entry.get("published") else "authors", - "publishedAt": date_parse(entry.get("publishedAt")) if entry.get("published") else None, - "deletedAt": date_parse(entry.get("deletedAt")) if entry.get("deletedAt") else None, + "publishedAt": date_parse(entry.get("publishedAt")) + if entry.get("published") + else None, + "deletedAt": date_parse(entry.get("deletedAt")) + if entry.get("deletedAt") + else None, "createdAt": date_parse(entry.get("createdAt", OLD_DATE)), "updatedAt": date_parse(entry["updatedAt"]) if "updatedAt" in entry else ts, "topics": await add_topics_follower(entry, storage, author), - "body": extract_html(entry, cleanup=True) + "body": extract_html(entry, cleanup=True), } # main topic patch - r['mainTopic'] = r['topics'][0] + r["mainTopic"] = r["topics"][0] # published author auto-confirm if entry.get("published"): @@ -177,14 +189,16 @@ async def migrate(entry, storage): shout_dict["oid"] = entry.get("_id", "") shout = await create_shout(shout_dict) except IntegrityError as e: - print('[migration] create_shout integrity error', e) + print("[migration] create_shout integrity error", e) shout = await resolve_create_shout(shout_dict) except Exception as e: raise Exception(e) # udpate data shout_dict = shout.dict() - shout_dict["authors"] = [author.dict(), ] + shout_dict["authors"] = [ + author.dict(), + ] # shout topics aftermath shout_dict["topics"] = await topics_aftermath(r, storage) @@ -193,7 +207,9 @@ async def migrate(entry, storage): await content_ratings_to_reactions(entry, shout_dict["slug"]) # shout views - await ViewedStorage.increment(shout_dict["slug"], amount=entry.get("views", 1), viewer='old-discours') + await ViewedStorage.increment( + shout_dict["slug"], amount=entry.get("views", 1), viewer="old-discours" + ) # del shout_dict['ratings'] storage["shouts"]["by_oid"][entry["_id"]] = shout_dict @@ -205,7 +221,9 @@ async def add_topics_follower(entry, storage, user): topics = set([]) category = entry.get("category") topics_by_oid = storage["topics"]["by_oid"] - oids = [category, ] + entry.get("tags", []) + oids = [ + category, + ] + entry.get("tags", []) for toid in oids: tslug = topics_by_oid.get(toid, {}).get("slug") if tslug: @@ -217,23 +235,20 @@ async def add_topics_follower(entry, storage, user): try: tpc = session.query(Topic).where(Topic.slug == tpcslug).first() if tpc: - tf = session.query( - TopicFollower - ).where( - TopicFollower.follower == user.id - ).filter( - TopicFollower.topic == tpc.id - ).first() + tf = ( + session.query(TopicFollower) + .where(TopicFollower.follower == user.id) + .filter(TopicFollower.topic == tpc.id) + .first() + ) if not tf: tf = TopicFollower.create( - topic=tpc.id, - follower=user.id, - auto=True + topic=tpc.id, follower=user.id, auto=True ) session.add(tf) session.commit() except IntegrityError: - print('[migration.shout] hidden by topic ' + tpc.slug) + print("[migration.shout] hidden by topic " + tpc.slug) # main topic maintopic = storage["replacements"].get(topics_by_oid.get(category, {}).get("slug")) if maintopic in ttt: @@ -254,7 +269,7 @@ async def process_user(userdata, storage, oid): if not user: try: slug = userdata["slug"].lower().strip() - slug = re.sub('[^0-9a-zA-Z]+', '-', slug) + slug = re.sub("[^0-9a-zA-Z]+", "-", slug) userdata["slug"] = slug user = User.create(**userdata) session.add(user) @@ -263,7 +278,9 @@ async def process_user(userdata, storage, oid): print(f"[migration] user creating with slug {userdata['slug']}") print("[migration] from userdata") print(userdata) - raise Exception("[migration] cannot create user in content_items.get_user()") + raise Exception( + "[migration] cannot create user in content_items.get_user()" + ) if user.id == 946: print("[migration] ***************** ALPINA") if user.id == 2: @@ -282,9 +299,9 @@ async def resolve_create_shout(shout_dict): s = session.query(Shout).filter(Shout.slug == shout_dict["slug"]).first() bump = False if s: - if s.createdAt != shout_dict['createdAt']: + if s.createdAt != shout_dict["createdAt"]: # create new with different slug - shout_dict["slug"] += '-' + shout_dict["layout"] + shout_dict["slug"] += "-" + shout_dict["layout"] try: await create_shout(shout_dict) except IntegrityError as e: @@ -375,7 +392,7 @@ async def content_ratings_to_reactions(entry, slug): if content_rating["value"] > 0 else ReactionKind.DISLIKE, "createdBy": rater.id, - "shout": shout.id + "shout": shout.id, } reaction = ( session.query(Reaction) @@ -385,7 +402,11 @@ async def content_ratings_to_reactions(entry, slug): .first() ) if reaction: - k = ReactionKind.AGREE if content_rating["value"] > 0 else ReactionKind.DISAGREE + k = ( + ReactionKind.AGREE + if content_rating["value"] > 0 + else ReactionKind.DISAGREE + ) reaction_dict["kind"] = k reaction.update(reaction_dict) session.add(reaction) diff --git a/migration/tables/remarks.py b/migration/tables/remarks.py index 026b95c6..9a426346 100644 --- a/migration/tables/remarks.py +++ b/migration/tables/remarks.py @@ -1,38 +1,30 @@ -from base.orm import local_session +from services.db import local_session from migration.extract import extract_md from migration.html2text import html2text from orm.reaction import Reaction, ReactionKind def migrate(entry, storage): - post_oid = entry['contentItem'] + post_oid = entry["contentItem"] print(post_oid) - shout_dict = storage['shouts']['by_oid'].get(post_oid) + shout_dict = storage["shouts"]["by_oid"].get(post_oid) if shout_dict: - print(shout_dict['body']) + print(shout_dict["body"]) remark = { - "shout": shout_dict['id'], - "body": extract_md( - html2text(entry['body']), - shout_dict - ), - "kind": ReactionKind.REMARK + "shout": shout_dict["id"], + "body": extract_md(html2text(entry["body"]), shout_dict), + "kind": ReactionKind.REMARK, } - if entry.get('textBefore'): - remark['range'] = str( - shout_dict['body'] - .index( - entry['textBefore'] or '' - ) - ) + ':' + str( - shout_dict['body'] - .index( - entry['textAfter'] or '' - ) + len( - entry['textAfter'] or '' - ) + if entry.get("textBefore"): + remark["range"] = ( + str(shout_dict["body"].index(entry["textBefore"] or "")) + + ":" + + str( + shout_dict["body"].index(entry["textAfter"] or "") + + len(entry["textAfter"] or "") ) + ) with local_session() as session: rmrk = Reaction.create(**remark) diff --git a/migration/tables/topics.py b/migration/tables/topics.py index 17804376..9fb5c45f 100644 --- a/migration/tables/topics.py +++ b/migration/tables/topics.py @@ -1,4 +1,4 @@ -from base.orm import local_session +from services.db import local_session from migration.extract import extract_md from migration.html2text import html2text from orm import Topic @@ -10,7 +10,7 @@ def migrate(entry): "slug": entry["slug"], "oid": entry["_id"], "title": entry["title"].replace(" ", " "), - "body": extract_md(html2text(body_orig)) + "body": extract_md(html2text(body_orig)), } with local_session() as session: diff --git a/migration/tables/users.py b/migration/tables/users.py index d7a0f260..9e85a3df 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -4,7 +4,7 @@ from bs4 import BeautifulSoup from dateutil.parser import parse from sqlalchemy.exc import IntegrityError -from base.orm import local_session +from services.db import local_session from orm.user import AuthorFollower, User, UserRating @@ -19,12 +19,13 @@ def migrate(entry): "username": email, "email": email, "createdAt": parse(entry["createdAt"]), - "emailConfirmed": ("@discours.io" in email) or bool(entry["emails"][0]["verified"]), + "emailConfirmed": ("@discours.io" in email) + or bool(entry["emails"][0]["verified"]), "muted": False, # amnesty "bio": entry["profile"].get("bio", ""), "links": [], "name": "anonymous", - "password": entry["services"]["password"].get("bcrypt") + "password": entry["services"]["password"].get("bcrypt"), } if "updatedAt" in entry: @@ -34,9 +35,13 @@ def migrate(entry): if entry.get("profile"): # slug slug = entry["profile"].get("path").lower() - slug = re.sub('[^0-9a-zA-Z]+', '-', slug).strip() + slug = re.sub("[^0-9a-zA-Z]+", "-", slug).strip() user_dict["slug"] = slug - bio = (entry.get("profile", {"bio": ""}).get("bio") or "").replace('\(', '(').replace('\)', ')') + bio = ( + (entry.get("profile", {"bio": ""}).get("bio") or "") + .replace("\(", "(") + .replace("\)", ")") + ) bio_text = BeautifulSoup(bio, features="lxml").text if len(bio_text) > 120: @@ -115,7 +120,7 @@ def post_migrate(): "slug": "old-discours", "username": "old-discours", "email": "old@discours.io", - "name": "Просмотры на старой версии сайта" + "name": "Просмотры на старой версии сайта", } with local_session() as session: @@ -148,11 +153,9 @@ def migrate_2stage(entry, id_map): } user_rating = UserRating.create(**user_rating_dict) - if user_rating_dict['value'] > 0: + if user_rating_dict["value"] > 0: af = AuthorFollower.create( - author=user.id, - follower=rater.id, - auto=True + author=user.id, follower=rater.id, auto=True ) session.add(af) session.add(user_rating) diff --git a/orm/__init__.py b/orm/__init__.py index bd2c9fb7..6501baf4 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -1,4 +1,4 @@ -from base.orm import Base, engine +from services.db import Base, engine from orm.community import Community from orm.notification import Notification from orm.rbac import Operation, Resource, Permission, Role diff --git a/orm/collection.py b/orm/collection.py index c9975b62..e75daf1a 100644 --- a/orm/collection.py +++ b/orm/collection.py @@ -2,7 +2,7 @@ from datetime import datetime from sqlalchemy import Column, DateTime, ForeignKey, String -from base.orm import Base +from services.db import Base class ShoutCollection(Base): diff --git a/orm/community.py b/orm/community.py index b55b857f..790957b6 100644 --- a/orm/community.py +++ b/orm/community.py @@ -1,7 +1,7 @@ from datetime import datetime from sqlalchemy import Column, String, ForeignKey, DateTime -from base.orm import Base, local_session +from services.db import Base, local_session class CommunityFollower(Base): @@ -30,12 +30,10 @@ class Community(Base): @staticmethod def init_table(): with local_session() as session: - d = ( - session.query(Community).filter(Community.slug == "discours").first() - ) + d = session.query(Community).filter(Community.slug == "discours").first() if not d: d = Community.create(name="Дискурс", slug="discours") session.add(d) session.commit() Community.default_community = d - print('[orm] default community id: %s' % d.id) + print("[orm] default community id: %s" % d.id) diff --git a/orm/notification.py b/orm/notification.py index 41914983..d0b6d563 100644 --- a/orm/notification.py +++ b/orm/notification.py @@ -1,6 +1,6 @@ from datetime import datetime from sqlalchemy import Column, String, JSON, ForeignKey, DateTime, Boolean -from base.orm import Base +from services.db import Base class Notification(Base): diff --git a/orm/rbac.py b/orm/rbac.py index 29ade72e..f6667567 100644 --- a/orm/rbac.py +++ b/orm/rbac.py @@ -3,7 +3,7 @@ import warnings from sqlalchemy import String, Column, ForeignKey, UniqueConstraint, TypeDecorator from sqlalchemy.orm import relationship -from base.orm import Base, REGISTRY, engine, local_session +from services.db import Base, REGISTRY, engine, local_session # Role Based Access Control # @@ -130,7 +130,16 @@ class Resource(Base): @staticmethod def init_table(): with local_session() as session: - for res in ["shout", "topic", "reaction", "chat", "message", "invite", "community", "user"]: + for res in [ + "shout", + "topic", + "reaction", + "chat", + "message", + "invite", + "community", + "user", + ]: r = session.query(Resource).filter(Resource.name == res).first() if not r: r = Resource.create(name=res, resourceClass=res) diff --git a/orm/reaction.py b/orm/reaction.py index 1c129e23..f5af799a 100644 --- a/orm/reaction.py +++ b/orm/reaction.py @@ -3,7 +3,7 @@ from enum import Enum as Enumeration from sqlalchemy import Column, DateTime, Enum, ForeignKey, String -from base.orm import Base +from services.db import Base class ReactionKind(Enumeration): @@ -30,11 +30,17 @@ class Reaction(Base): createdAt = Column( DateTime, nullable=False, default=datetime.now, comment="Created at" ) - createdBy = Column(ForeignKey("user.id"), nullable=False, index=True, comment="Sender") + createdBy = Column( + ForeignKey("user.id"), nullable=False, index=True, comment="Sender" + ) updatedAt = Column(DateTime, nullable=True, comment="Updated at") - updatedBy = Column(ForeignKey("user.id"), nullable=True, index=True, comment="Last Editor") + updatedBy = Column( + ForeignKey("user.id"), nullable=True, index=True, comment="Last Editor" + ) deletedAt = Column(DateTime, nullable=True, comment="Deleted at") - deletedBy = Column(ForeignKey("user.id"), nullable=True, index=True, comment="Deleted by") + deletedBy = Column( + ForeignKey("user.id"), nullable=True, index=True, comment="Deleted by" + ) shout = Column(ForeignKey("shout.id"), nullable=False, index=True) replyTo = Column( ForeignKey("reaction.id"), nullable=True, comment="Reply to reaction ID" diff --git a/orm/shout.py b/orm/shout.py index 22381d4c..ef9be246 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -3,7 +3,7 @@ from datetime import datetime from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, JSON from sqlalchemy.orm import column_property, relationship -from base.orm import Base, local_session +from services.db import Base, local_session from orm.reaction import Reaction from orm.topic import Topic from orm.user import User @@ -43,7 +43,9 @@ class Shout(Base): __tablename__ = "shout" # timestamps - createdAt = Column(DateTime, nullable=False, default=datetime.now, comment="Created at") + createdAt = Column( + DateTime, nullable=False, default=datetime.now, comment="Created at" + ) updatedAt = Column(DateTime, nullable=True, comment="Updated at") publishedAt = Column(DateTime, nullable=True) deletedAt = Column(DateTime, nullable=True) @@ -72,7 +74,7 @@ class Shout(Base): # TODO: these field should be used or modified community = Column(ForeignKey("community.id"), default=1) - lang = Column(String, nullable=False, default='ru', comment="Language") + lang = Column(String, nullable=False, default="ru", comment="Language") mainTopic = Column(ForeignKey("topic.slug"), nullable=True) visibility = Column(String, nullable=True) # owner authors community public versionOf = Column(ForeignKey("shout.id"), nullable=True) @@ -87,7 +89,7 @@ class Shout(Base): "slug": "genesis-block", "body": "", "title": "Ничего", - "lang": "ru" + "lang": "ru", } s = Shout.create(**entry) session.add(s) diff --git a/orm/topic.py b/orm/topic.py index a37dc69a..2ae85020 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -2,7 +2,7 @@ from datetime import datetime from sqlalchemy import Boolean, Column, DateTime, ForeignKey, String -from base.orm import Base +from services.db import Base class TopicFollower(Base): @@ -24,7 +24,5 @@ class Topic(Base): title = Column(String, nullable=False, comment="Title") body = Column(String, nullable=True, comment="Body") pic = Column(String, nullable=True, comment="Picture") - community = Column( - ForeignKey("community.id"), default=1, comment="Community" - ) + community = Column(ForeignKey("community.id"), default=1, comment="Community") oid = Column(String, nullable=True, comment="Old ID") diff --git a/orm/user.py b/orm/user.py index 5aeab90e..f4d4fec7 100644 --- a/orm/user.py +++ b/orm/user.py @@ -3,7 +3,7 @@ from datetime import datetime from sqlalchemy import JSON as JSONType from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String from sqlalchemy.orm import relationship -from base.orm import Base, local_session +from services.db import Base, local_session from orm.rbac import Role diff --git a/resolvers/__init__.py b/resolvers/__init__.py index b7ba1e1e..c9b82ea3 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -8,62 +8,36 @@ from resolvers.auth import ( get_current_user, ) -from resolvers.create.migrate import markdown_body -from resolvers.create.editor import create_shout, delete_shout, update_shout - -from resolvers.zine.profile import ( +from resolvers.migrate import markdown_body +from resolvers.editor import create_shout, delete_shout, update_shout +from resolvers.profile import ( load_authors_by, rate_user, update_profile, - get_authors_all + get_authors_all, ) -from resolvers.zine.reactions import ( +from resolvers.topics import ( + topics_all, + topics_by_community, + topics_by_author, + topic_follow, + topic_unfollow, + get_topic, +) + +from resolvers.reactions import ( create_reaction, delete_reaction, update_reaction, reactions_unfollow, reactions_follow, - load_reactions_by -) -from resolvers.zine.topics import ( - topic_follow, - topic_unfollow, - topics_by_author, - topics_by_community, - topics_all, - get_topic + load_reactions_by, ) -from resolvers.zine.following import ( - follow, - unfollow -) +from resolvers.following import follow, unfollow -from resolvers.zine.load import ( - load_shout, - load_shouts_by -) - -from resolvers.inbox.chats import ( - create_chat, - delete_chat, - update_chat - -) -from resolvers.inbox.messages import ( - create_message, - delete_message, - update_message, - message_generator, - mark_as_read -) -from resolvers.inbox.load import ( - load_chats, - load_messages_by, - load_recipients -) -from resolvers.inbox.search import search_recipients +from resolvers.load import load_shout, load_shouts_by __all__ = [ # auth @@ -74,12 +48,12 @@ __all__ = [ "auth_send_link", "sign_out", "get_current_user", - # zine.profile + # profile "load_authors_by", "rate_user", "update_profile", "get_authors_all", - # zine.load + # load "load_shout", "load_shouts_by", # zine.following @@ -90,7 +64,7 @@ __all__ = [ "update_shout", "delete_shout", "markdown_body", - # zine.topics + # topics "topics_all", "topics_by_community", "topics_by_author", @@ -104,17 +78,4 @@ __all__ = [ "update_reaction", "delete_reaction", "load_reactions_by", - # inbox - "load_chats", - "load_messages_by", - "create_chat", - "delete_chat", - "update_chat", - "create_message", - "delete_message", - "update_message", - "message_generator", - "mark_as_read", - "load_recipients", - "search_recipients" ] diff --git a/resolvers/auth.py b/resolvers/auth.py index 669a56d1..62a854f9 100644 --- a/resolvers/auth.py +++ b/resolvers/auth.py @@ -13,10 +13,15 @@ from auth.email import send_auth_email from auth.identity import Identity, Password from auth.jwtcodec import JWTCodec from auth.tokenstorage import TokenStorage -from base.exceptions import (BaseHttpException, InvalidPassword, InvalidToken, - ObjectNotExist, Unauthorized) -from base.orm import local_session -from base.resolvers import mutation, query +from services.exceptions import ( + BaseHttpException, + InvalidPassword, + InvalidToken, + ObjectNotExist, + Unauthorized, +) +from services.db import local_session +from services.schema import mutation, query from orm import Role, User from resolvers.zine.profile import user_subscriptions from settings import SESSION_TOKEN_HEADER, FRONTEND_URL @@ -44,7 +49,7 @@ async def get_current_user(_, info): async def confirm_email(_, info, token): """confirm owning email address""" try: - print('[resolvers.auth] confirm email by token') + print("[resolvers.auth] confirm email by token") payload = JWTCodec.decode(token) user_id = payload.user_id await TokenStorage.get(f"{user_id}-{payload.username}-{token}") @@ -58,7 +63,7 @@ async def confirm_email(_, info, token): return { "token": session_token, "user": user, - "news": await user_subscriptions(user.id) + "news": await user_subscriptions(user.id), } except InvalidToken as e: raise InvalidToken(e.message) @@ -71,9 +76,9 @@ async def confirm_email_handler(request): token = request.path_params["token"] # one time request.session["token"] = token res = await confirm_email(None, {}, token) - print('[resolvers.auth] confirm_email request: %r' % request) + print("[resolvers.auth] confirm_email request: %r" % request) if "error" in res: - raise BaseHttpException(res['error']) + raise BaseHttpException(res["error"]) else: response = RedirectResponse(url=FRONTEND_URL) response.set_cookie("token", res["token"]) # session token @@ -90,22 +95,22 @@ def create_user(user_dict): def generate_unique_slug(src): - print('[resolvers.auth] generating slug from: ' + src) + print("[resolvers.auth] generating slug from: " + src) slug = translit(src, "ru", reversed=True).replace(".", "-").lower() - slug = re.sub('[^0-9a-zA-Z]+', '-', slug) + slug = re.sub("[^0-9a-zA-Z]+", "-", slug) if slug != src: - print('[resolvers.auth] translited name: ' + slug) + print("[resolvers.auth] translited name: " + slug) c = 1 with local_session() as session: user = session.query(User).where(User.slug == slug).first() while user: user = session.query(User).where(User.slug == slug).first() - slug = slug + '-' + str(c) + slug = slug + "-" + str(c) c += 1 if not user: unique_slug = slug - print('[resolvers.auth] ' + unique_slug) - return quote_plus(unique_slug.replace('\'', '')).replace('+', '-') + print("[resolvers.auth] " + unique_slug) + return quote_plus(unique_slug.replace("'", "")).replace("+", "-") @mutation.field("registerUser") @@ -120,12 +125,12 @@ async def register_by_email(_, _info, email: str, password: str = "", name: str slug = generate_unique_slug(name) user = session.query(User).where(User.slug == slug).first() if user: - slug = generate_unique_slug(email.split('@')[0]) + slug = generate_unique_slug(email.split("@")[0]) user_dict = { "email": email, "username": email, # will be used to store phone number or some messenger network id "name": name, - "slug": slug + "slug": slug, } if password: user_dict["password"] = Password.encode(password) @@ -182,7 +187,9 @@ async def login(_, info, email: str, password: str = "", lang: str = "ru"): } except InvalidPassword: print(f"[auth] {email}: invalid password") - raise InvalidPassword("invalid password") # contains webserver status + raise InvalidPassword( + "invalid password" + ) # contains webserver status # return {"error": "invalid password"} diff --git a/resolvers/create/editor.py b/resolvers/editor.py similarity index 62% rename from resolvers/create/editor.py rename to resolvers/editor.py index c81ff404..da5de73f 100644 --- a/resolvers/create/editor.py +++ b/resolvers/editor.py @@ -5,8 +5,8 @@ from sqlalchemy.orm import joinedload from auth.authenticate import login_required from auth.credentials import AuthCredentials -from base.orm import local_session -from base.resolvers import mutation +from services.db import local_session +from services.schema import mutation from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic from resolvers.zine.reactions import reactions_follow, reactions_unfollow @@ -18,21 +18,25 @@ async def create_shout(_, info, inp): auth: AuthCredentials = info.context["request"].auth with local_session() as session: - topics = session.query(Topic).filter(Topic.slug.in_(inp.get('topics', []))).all() + topics = ( + session.query(Topic).filter(Topic.slug.in_(inp.get("topics", []))).all() + ) - new_shout = Shout.create(**{ - "title": inp.get("title"), - "subtitle": inp.get('subtitle'), - "lead": inp.get('lead'), - "description": inp.get('description'), - "body": inp.get("body", ''), - "layout": inp.get("layout"), - "authors": inp.get("authors", []), - "slug": inp.get("slug"), - "mainTopic": inp.get("mainTopic"), - "visibility": "owner", - "createdBy": auth.user_id - }) + new_shout = Shout.create( + **{ + "title": inp.get("title"), + "subtitle": inp.get("subtitle"), + "lead": inp.get("lead"), + "description": inp.get("description"), + "body": inp.get("body", ""), + "layout": inp.get("layout"), + "authors": inp.get("authors", []), + "slug": inp.get("slug"), + "mainTopic": inp.get("mainTopic"), + "visibility": "owner", + "createdBy": auth.user_id, + } + ) for topic in topics: t = ShoutTopic.create(topic=topic.id, shout=new_shout.id) @@ -64,10 +68,15 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): auth: AuthCredentials = info.context["request"].auth with local_session() as session: - shout = session.query(Shout).options( - joinedload(Shout.authors), - joinedload(Shout.topics), - ).filter(Shout.id == shout_id).first() + shout = ( + session.query(Shout) + .options( + joinedload(Shout.authors), + joinedload(Shout.topics), + ) + .filter(Shout.id == shout_id) + .first() + ) if not shout: return {"error": "shout not found"} @@ -82,7 +91,9 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): del shout_input["topics"] new_topics_to_link = [] - new_topics = [topic_input for topic_input in topics_input if topic_input["id"] < 0] + new_topics = [ + topic_input for topic_input in topics_input if topic_input["id"] < 0 + ] for new_topic in new_topics: del new_topic["id"] @@ -94,24 +105,40 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): session.commit() for new_topic_to_link in new_topics_to_link: - created_unlinked_topic = ShoutTopic.create(shout=shout.id, topic=new_topic_to_link.id) + created_unlinked_topic = ShoutTopic.create( + shout=shout.id, topic=new_topic_to_link.id + ) session.add(created_unlinked_topic) - existing_topics_input = [topic_input for topic_input in topics_input if topic_input.get("id", 0) > 0] - existing_topic_to_link_ids = [existing_topic_input["id"] for existing_topic_input in existing_topics_input - if existing_topic_input["id"] not in [topic.id for topic in shout.topics]] + existing_topics_input = [ + topic_input + for topic_input in topics_input + if topic_input.get("id", 0) > 0 + ] + existing_topic_to_link_ids = [ + existing_topic_input["id"] + for existing_topic_input in existing_topics_input + if existing_topic_input["id"] + not in [topic.id for topic in shout.topics] + ] for existing_topic_to_link_id in existing_topic_to_link_ids: - created_unlinked_topic = ShoutTopic.create(shout=shout.id, topic=existing_topic_to_link_id) + created_unlinked_topic = ShoutTopic.create( + shout=shout.id, topic=existing_topic_to_link_id + ) session.add(created_unlinked_topic) - topic_to_unlink_ids = [topic.id for topic in shout.topics - if topic.id not in [topic_input["id"] for topic_input in existing_topics_input]] + topic_to_unlink_ids = [ + topic.id + for topic in shout.topics + if topic.id + not in [topic_input["id"] for topic_input in existing_topics_input] + ] shout_topics_to_remove = session.query(ShoutTopic).filter( and_( ShoutTopic.shout == shout.id, - ShoutTopic.topic.in_(topic_to_unlink_ids) + ShoutTopic.topic.in_(topic_to_unlink_ids), ) ) @@ -120,13 +147,13 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): shout_input["mainTopic"] = shout_input["mainTopic"]["slug"] - if shout_input["mainTopic"] == '': + if shout_input["mainTopic"] == "": del shout_input["mainTopic"] shout.update(shout_input) updated = True - if publish and shout.visibility == 'owner': + if publish and shout.visibility == "owner": shout.visibility = "community" shout.publishedAt = datetime.now(tz=timezone.utc) updated = True diff --git a/resolvers/zine/following.py b/resolvers/following.py similarity index 66% rename from resolvers/zine/following.py rename to resolvers/following.py index b2e039f1..a895f666 100644 --- a/resolvers/zine/following.py +++ b/resolvers/following.py @@ -1,8 +1,9 @@ import asyncio -from base.orm import local_session -from base.resolvers import mutation, subscription +from services.db import local_session +from services.schema import mutation, subscription from auth.authenticate import login_required from auth.credentials import AuthCredentials + # from resolvers.community import community_follow, community_unfollow from orm.user import AuthorFollower from orm.topic import TopicFollower @@ -22,20 +23,20 @@ async def follow(_, info, what, slug): try: if what == "AUTHOR": if author_follow(auth.user_id, slug): - result = FollowingResult("NEW", 'author', slug) - await FollowingManager.push('author', result) + result = FollowingResult("NEW", "author", slug) + await FollowingManager.push("author", result) elif what == "TOPIC": if topic_follow(auth.user_id, slug): - result = FollowingResult("NEW", 'topic', slug) - await FollowingManager.push('topic', result) + result = FollowingResult("NEW", "topic", slug) + await FollowingManager.push("topic", result) elif what == "COMMUNITY": if False: # TODO: use community_follow(auth.user_id, slug): - result = FollowingResult("NEW", 'community', slug) - await FollowingManager.push('community', result) + result = FollowingResult("NEW", "community", slug) + await FollowingManager.push("community", result) elif what == "REACTIONS": if reactions_follow(auth.user_id, slug): - result = FollowingResult("NEW", 'shout', slug) - await FollowingManager.push('shout', result) + result = FollowingResult("NEW", "shout", slug) + await FollowingManager.push("shout", result) except Exception as e: print(Exception(e)) return {"error": str(e)} @@ -51,20 +52,20 @@ async def unfollow(_, info, what, slug): try: if what == "AUTHOR": if author_unfollow(auth.user_id, slug): - result = FollowingResult("DELETED", 'author', slug) - await FollowingManager.push('author', result) + result = FollowingResult("DELETED", "author", slug) + await FollowingManager.push("author", result) elif what == "TOPIC": if topic_unfollow(auth.user_id, slug): - result = FollowingResult("DELETED", 'topic', slug) - await FollowingManager.push('topic', result) + result = FollowingResult("DELETED", "topic", slug) + await FollowingManager.push("topic", result) elif what == "COMMUNITY": if False: # TODO: use community_unfollow(auth.user_id, slug): - result = FollowingResult("DELETED", 'community', slug) - await FollowingManager.push('community', result) + result = FollowingResult("DELETED", "community", slug) + await FollowingManager.push("community", result) elif what == "REACTIONS": if reactions_unfollow(auth.user_id, slug): - result = FollowingResult("DELETED", 'shout', slug) - await FollowingManager.push('shout', result) + result = FollowingResult("DELETED", "shout", slug) + await FollowingManager.push("shout", result) except Exception as e: return {"error": str(e)} @@ -82,23 +83,29 @@ async def shout_generator(_, info: GraphQLResolveInfo): tasks = [] with local_session() as session: - # notify new shout by followed authors - following_topics = session.query(TopicFollower).where(TopicFollower.follower == user_id).all() + following_topics = ( + session.query(TopicFollower) + .where(TopicFollower.follower == user_id) + .all() + ) for topic_id in following_topics: - following_topic = Following('topic', topic_id) - await FollowingManager.register('topic', following_topic) + following_topic = Following("topic", topic_id) + await FollowingManager.register("topic", following_topic) following_topic_task = following_topic.queue.get() tasks.append(following_topic_task) # by followed topics - following_authors = session.query(AuthorFollower).where( - AuthorFollower.follower == user_id).all() + following_authors = ( + session.query(AuthorFollower) + .where(AuthorFollower.follower == user_id) + .all() + ) for author_id in following_authors: - following_author = Following('author', author_id) - await FollowingManager.register('author', following_author) + following_author = Following("author", author_id) + await FollowingManager.register("author", following_author) following_author_task = following_author.queue.get() tasks.append(following_author_task) @@ -128,15 +135,18 @@ async def reaction_generator(_, info): user_id = auth.user_id try: with local_session() as session: - followings = session.query(ShoutReactionsFollower.shout).where( - ShoutReactionsFollower.follower == user_id).unique() + followings = ( + session.query(ShoutReactionsFollower.shout) + .where(ShoutReactionsFollower.follower == user_id) + .unique() + ) # notify new reaction tasks = [] for shout_id in followings: - following_shout = Following('shout', shout_id) - await FollowingManager.register('shout', following_shout) + following_shout = Following("shout", shout_id) + await FollowingManager.register("shout", following_shout) following_author_task = following_shout.queue.get() tasks.append(following_author_task) diff --git a/resolvers/inbox/chats.py b/resolvers/inbox/chats.py deleted file mode 100644 index 853defab..00000000 --- a/resolvers/inbox/chats.py +++ /dev/null @@ -1,124 +0,0 @@ -import json -import uuid -from datetime import datetime, timezone - -from auth.authenticate import login_required -from auth.credentials import AuthCredentials -from base.redis import redis -from base.resolvers import mutation -from validations.inbox import Chat - - -@mutation.field("updateChat") -@login_required -async def update_chat(_, info, chat_new: Chat): - """ - updating chat - requires info["request"].user.slug to be in chat["admins"] - - :param info: GraphQLInfo with request - :param chat_new: dict with chat data - :return: Result { error chat } - """ - auth: AuthCredentials = info.context["request"].auth - chat_id = chat_new["id"] - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return { - "error": "chat not exist" - } - chat = dict(json.loads(chat)) - - # TODO - if auth.user_id in chat["admins"]: - chat.update({ - "title": chat_new.get("title", chat["title"]), - "description": chat_new.get("description", chat["description"]), - "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), - "admins": chat_new.get("admins", chat.get("admins") or []), - "users": chat_new.get("users", chat["users"]) - }) - await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat)) - await redis.execute("COMMIT") - - return { - "error": None, - "chat": chat - } - - -@mutation.field("createChat") -@login_required -async def create_chat(_, info, title="", members=[]): - auth: AuthCredentials = info.context["request"].auth - chat = {} - print('create_chat members: %r' % members) - if auth.user_id not in members: - members.append(int(auth.user_id)) - - # reuse chat craeted before if exists - if len(members) == 2 and title == "": - chat = None - print(members) - chatset1 = await redis.execute("SMEMBERS", f"chats_by_user/{members[0]}") - if not chatset1: - chatset1 = set([]) - print(chatset1) - chatset2 = await redis.execute("SMEMBERS", f"chats_by_user/{members[1]}") - if not chatset2: - chatset2 = set([]) - print(chatset2) - chatset = chatset1.intersection(chatset2) - print(chatset) - for c in chatset: - chat = await redis.execute("GET", f"chats/{c.decode('utf-8')}") - if chat: - chat = json.loads(chat) - if chat['title'] == "": - print('[inbox] createChat found old chat') - print(chat) - break - if chat: - return { - "chat": chat, - "error": "existed" - } - - chat_id = str(uuid.uuid4()) - chat = { - "id": chat_id, - "users": members, - "title": title, - "createdBy": auth.user_id, - "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), - "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), - "admins": members if (len(members) == 2 and title == "") else [] - } - - for m in members: - await redis.execute("SADD", f"chats_by_user/{m}", chat_id) - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) - await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) - await redis.execute("COMMIT") - return { - "error": None, - "chat": chat - } - - -@mutation.field("deleteChat") -@login_required -async def delete_chat(_, info, chat_id: str): - auth: AuthCredentials = info.context["request"].auth - - chat = await redis.execute("GET", f"/chats/{chat_id}") - if chat: - chat = dict(json.loads(chat)) - if auth.user_id in chat['admins']: - await redis.execute("DEL", f"chats/{chat_id}") - await redis.execute("SREM", "chats_by_user/" + str(auth.user_id), chat_id) - await redis.execute("COMMIT") - else: - return { - "error": "chat not exist" - } diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py deleted file mode 100644 index a0d41721..00000000 --- a/resolvers/inbox/load.py +++ /dev/null @@ -1,152 +0,0 @@ -import json -# from datetime import datetime, timedelta, timezone - -from auth.authenticate import login_required -from auth.credentials import AuthCredentials -from base.redis import redis -from base.orm import local_session -from base.resolvers import query -from orm.user import User -from resolvers.zine.profile import followed_authors -from .unread import get_unread_counter - - -async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=[]): - ''' load :limit messages for :chat_id with :offset ''' - messages = [] - message_ids = [] - if ids: - message_ids += ids - try: - if limit: - mids = await redis.lrange(f"chats/{chat_id}/message_ids", - offset, - offset + limit - ) - mids = [mid.decode("utf-8") for mid in mids] - message_ids += mids - except Exception as e: - print(e) - if message_ids: - message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids] - messages = await redis.mget(*message_keys) - messages = [json.loads(msg.decode('utf-8')) for msg in messages] - replies = [] - for m in messages: - rt = m.get('replyTo') - if rt: - rt = int(rt) - if rt not in message_ids: - replies.append(rt) - if replies: - messages += await load_messages(chat_id, limit=0, ids=replies) - return messages - - -@query.field("loadChats") -@login_required -async def load_chats(_, info, limit: int = 50, offset: int = 0): - """ load :limit chats of current user with :offset """ - auth: AuthCredentials = info.context["request"].auth - - cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) - if cids: - cids = list(cids)[offset:offset + limit] - if not cids: - print('[inbox.load] no chats were found') - cids = [] - onliners = await redis.execute("SMEMBERS", "users-online") - if not onliners: - onliners = [] - chats = [] - for cid in cids: - cid = cid.decode("utf-8") - c = await redis.execute("GET", "chats/" + cid) - if c: - c = dict(json.loads(c)) - c['messages'] = await load_messages(cid, 5, 0) - c['unread'] = await get_unread_counter(cid, auth.user_id) - with local_session() as session: - c['members'] = [] - for uid in c["users"]: - a = session.query(User).where(User.id == uid).first() - if a: - c['members'].append({ - "id": a.id, - "slug": a.slug, - "userpic": a.userpic, - "name": a.name, - "lastSeen": a.lastSeen, - "online": a.id in onliners - }) - chats.append(c) - return { - "chats": chats, - "error": None - } - - -@query.field("loadMessagesBy") -@login_required -async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): - ''' load :limit messages of :chat_id with :offset ''' - - auth: AuthCredentials = info.context["request"].auth - userchats = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) - userchats = [c.decode('utf-8') for c in userchats] - # print('[inbox] userchats: %r' % userchats) - if userchats: - # print('[inbox] loading messages by...') - messages = [] - by_chat = by.get('chat') - if by_chat in userchats: - chat = await redis.execute("GET", f"chats/{by_chat}") - # print(chat) - if not chat: - return { - "messages": [], - "error": "chat not exist" - } - # everyone's messages in filtered chat - messages = await load_messages(by_chat, limit, offset) - return { - "messages": sorted( - list(messages), - key=lambda m: m['createdAt'] - ), - "error": None - } - else: - return { - "error": "Cannot access messages of this chat" - } - - -@query.field("loadRecipients") -async def load_recipients(_, info, limit=50, offset=0): - chat_users = [] - auth: AuthCredentials = info.context["request"].auth - onliners = await redis.execute("SMEMBERS", "users-online") - if not onliners: - onliners = [] - try: - chat_users += await followed_authors(auth.user_id) - limit = limit - len(chat_users) - except Exception: - pass - with local_session() as session: - chat_users += session.query(User).where(User.emailConfirmed).limit(limit).offset(offset) - members = [] - for a in chat_users: - members.append({ - "id": a.id, - "slug": a.slug, - "userpic": a.userpic, - "name": a.name, - "lastSeen": a.lastSeen, - "online": a.id in onliners - }) - return { - "members": members, - "error": None - } diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py deleted file mode 100644 index 44ff1f03..00000000 --- a/resolvers/inbox/messages.py +++ /dev/null @@ -1,179 +0,0 @@ -import asyncio -import json -from typing import Any -from datetime import datetime, timezone -from graphql.type import GraphQLResolveInfo -from auth.authenticate import login_required -from auth.credentials import AuthCredentials -from base.redis import redis -from base.resolvers import mutation, subscription -from services.following import FollowingManager, FollowingResult, Following -from validations.inbox import Message - - -@mutation.field("createMessage") -@login_required -async def create_message(_, info, chat: str, body: str, replyTo=None): - """ create message with :body for :chat_id replying to :replyTo optionally """ - auth: AuthCredentials = info.context["request"].auth - - chat = await redis.execute("GET", f"chats/{chat}") - if not chat: - return { - "error": "chat is not exist" - } - else: - chat = dict(json.loads(chat)) - message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id") - message_id = int(message_id) - new_message = { - "chatId": chat['id'], - "id": message_id, - "author": auth.user_id, - "body": body, - "createdAt": int(datetime.now(tz=timezone.utc).timestamp()) - } - if replyTo: - new_message['replyTo'] = replyTo - chat['updatedAt'] = new_message['createdAt'] - await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) - print(f"[inbox] creating message {new_message}") - await redis.execute( - "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) - ) - await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) - await redis.execute("SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1)) - - users = chat["users"] - for user_slug in users: - await redis.execute( - "LPUSH", f"chats/{chat['id']}/unread/{user_slug}", str(message_id) - ) - - result = FollowingResult("NEW", 'chat', new_message) - await FollowingManager.push('chat', result) - - return { - "message": new_message, - "error": None - } - - -@mutation.field("updateMessage") -@login_required -async def update_message(_, info, chat_id: str, message_id: int, body: str): - auth: AuthCredentials = info.context["request"].auth - - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return {"error": "chat not exist"} - - message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") - if not message: - return {"error": "message not exist"} - - message = json.loads(message) - if message["author"] != auth.user_id: - return {"error": "access denied"} - - message["body"] = body - message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp()) - - await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) - - result = FollowingResult("UPDATED", 'chat', message) - await FollowingManager.push('chat', result) - - return { - "message": message, - "error": None - } - - -@mutation.field("deleteMessage") -@login_required -async def delete_message(_, info, chat_id: str, message_id: int): - auth: AuthCredentials = info.context["request"].auth - - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return {"error": "chat not exist"} - chat = json.loads(chat) - - message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") - if not message: - return {"error": "message not exist"} - message = json.loads(message) - if message["author"] != auth.user_id: - return {"error": "access denied"} - - await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) - await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") - - users = chat["users"] - for user_id in users: - await redis.execute("LREM", f"chats/{chat_id}/unread/{user_id}", 0, str(message_id)) - - result = FollowingResult("DELETED", 'chat', message) - await FollowingManager.push(result) - - return {} - - -@mutation.field("markAsRead") -@login_required -async def mark_as_read(_, info, chat_id: str, messages: [int]): - auth: AuthCredentials = info.context["request"].auth - - chat = await redis.execute("GET", f"chats/{chat_id}") - if not chat: - return {"error": "chat not exist"} - - chat = json.loads(chat) - users = set(chat["users"]) - if auth.user_id not in users: - return {"error": "access denied"} - - for message_id in messages: - await redis.execute("LREM", f"chats/{chat_id}/unread/{auth.user_id}", 0, str(message_id)) - - return { - "error": None - } - - -@subscription.source("newMessage") -async def message_generator(_, info: GraphQLResolveInfo): - print(f"[resolvers.messages] generator {info}") - auth: AuthCredentials = info.context["request"].auth - user_id = auth.user_id - try: - user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}") - if user_following_chats: - user_following_chats = list(json.loads(user_following_chats)) # chat ids - else: - user_following_chats = [] - tasks = [] - updated = {} - for chat_id in user_following_chats: - chat = await redis.execute("GET", f"chats/{chat_id}") - updated[chat_id] = chat['updatedAt'] - user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) - - for chat_id in user_following_chats_sorted: - following_chat = Following('chat', chat_id) - await FollowingManager.register('chat', following_chat) - chat_task = following_chat.queue.get() - tasks.append(chat_task) - - while True: - msg = await asyncio.gather(*tasks) - yield msg - finally: - await FollowingManager.remove('chat', following_chat) - - -@subscription.field("newMessage") -@login_required -async def message_resolver(message: Message, info: Any): - return message diff --git a/resolvers/inbox/search.py b/resolvers/inbox/search.py deleted file mode 100644 index 1ca340e5..00000000 --- a/resolvers/inbox/search.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -from datetime import datetime, timezone, timedelta -from auth.authenticate import login_required -from auth.credentials import AuthCredentials -from base.redis import redis -from base.resolvers import query -from base.orm import local_session -from orm.user import AuthorFollower, User -from resolvers.inbox.load import load_messages - - -@query.field("searchRecipients") -@login_required -async def search_recipients(_, info, query: str, limit: int = 50, offset: int = 0): - result = [] - # TODO: maybe redis scan? - auth: AuthCredentials = info.context["request"].auth - talk_before = await redis.execute("GET", f"/chats_by_user/{auth.user_id}") - if talk_before: - talk_before = list(json.loads(talk_before))[offset:offset + limit] - for chat_id in talk_before: - members = await redis.execute("GET", f"/chats/{chat_id}/users") - if members: - members = list(json.loads(members)) - for member in members: - if member.startswith(query): - if member not in result: - result.append(member) - - more_amount = limit - len(result) - - with local_session() as session: - # followings - result += session.query(AuthorFollower.author).join( - User, User.id == AuthorFollower.follower - ).where( - User.slug.startswith(query) - ).offset(offset + len(result)).limit(more_amount) - - more_amount = limit - # followers - result += session.query(AuthorFollower.follower).join( - User, User.id == AuthorFollower.author - ).where( - User.slug.startswith(query) - ).offset(offset + len(result)).limit(offset + len(result) + limit) - return { - "members": list(result), - "error": None - } - - -@query.field("searchMessages") -@login_required -async def search_user_chats(by, messages, user_id: int, limit, offset): - cids = set([]) - cids.union(set(await redis.execute("SMEMBERS", "chats_by_user/" + str(user_id)))) - messages = [] - - by_author = by.get('author') - if by_author: - # all author's messages - cids.union(set(await redis.execute("SMEMBERS", f"chats_by_user/{by_author}"))) - # author's messages in filtered chat - messages.union(set(filter(lambda m: m["author"] == by_author, list(messages)))) - for c in cids: - c = c.decode('utf-8') - messages = await load_messages(c, limit, offset) - - body_like = by.get('body') - if body_like: - # search in all messages in all user's chats - for c in cids: - # FIXME: use redis scan here - c = c.decode('utf-8') - mmm = await load_messages(c, limit, offset) - for m in mmm: - if body_like in m["body"]: - messages.add(m) - else: - # search in chat's messages - messages.extend(filter(lambda m: body_like in m["body"], list(messages))) - - days = by.get("days") - if days: - messages.extend(filter( - list(messages), - key=lambda m: ( - datetime.now(tz=timezone.utc) - int(m["createdAt"]) < timedelta(days=by["days"]) - ) - )) - return { - "messages": messages, - "error": None - } diff --git a/resolvers/inbox/unread.py b/resolvers/inbox/unread.py deleted file mode 100644 index 7380f7ac..00000000 --- a/resolvers/inbox/unread.py +++ /dev/null @@ -1,22 +0,0 @@ -from base.redis import redis -import json - - -async def get_unread_counter(chat_id: str, user_id: int): - try: - unread = await redis.execute("LLEN", f"chats/{chat_id.decode('utf-8')}/unread/{user_id}") - if unread: - return unread - except Exception: - return 0 - - -async def get_total_unread_counter(user_id: int): - chats = await redis.execute("GET", f"chats_by_user/{str(user_id)}") - unread = 0 - if chats: - chats = json.loads(chats) - for chat_id in chats: - n = await get_unread_counter(chat_id.decode('utf-8'), user_id) - unread += n - return unread diff --git a/resolvers/zine/load.py b/resolvers/load.py similarity index 55% rename from resolvers/zine/load.py rename to resolvers/load.py index 3f91b92d..2444cd41 100644 --- a/resolvers/zine/load.py +++ b/resolvers/load.py @@ -1,13 +1,13 @@ from datetime import datetime, timedelta, timezone from sqlalchemy.orm import joinedload, aliased -from sqlalchemy.sql.expression import desc, asc, select, func, case, and_, text, nulls_last +from sqlalchemy.sql.expression import desc, asc, select, func, case, and_, nulls_last from auth.authenticate import login_required from auth.credentials import AuthCredentials -from base.exceptions import ObjectNotExist, OperationNotAllowed -from base.orm import local_session -from base.resolvers import query +from services.exceptions import ObjectNotExist +from services.db import local_session +from services.schema import query from orm import TopicFollower from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutAuthor, ShoutTopic @@ -18,32 +18,32 @@ def add_stat_columns(q): aliased_reaction = aliased(Reaction) q = q.outerjoin(aliased_reaction).add_columns( + func.sum(aliased_reaction.id).label("reacted_stat"), func.sum( - aliased_reaction.id - ).label('reacted_stat'), + case((aliased_reaction.kind == ReactionKind.COMMENT, 1), else_=0) + ).label("commented_stat"), func.sum( case( - (aliased_reaction.kind == ReactionKind.COMMENT, 1), - else_=0 + # do not count comments' reactions + (aliased_reaction.replyTo.is_not(None), 0), + (aliased_reaction.kind == ReactionKind.AGREE, 1), + (aliased_reaction.kind == ReactionKind.DISAGREE, -1), + (aliased_reaction.kind == ReactionKind.PROOF, 1), + (aliased_reaction.kind == ReactionKind.DISPROOF, -1), + (aliased_reaction.kind == ReactionKind.ACCEPT, 1), + (aliased_reaction.kind == ReactionKind.REJECT, -1), + (aliased_reaction.kind == ReactionKind.LIKE, 1), + (aliased_reaction.kind == ReactionKind.DISLIKE, -1), + else_=0, ) - ).label('commented_stat'), - func.sum(case( - # do not count comments' reactions - (aliased_reaction.replyTo.is_not(None), 0), - (aliased_reaction.kind == ReactionKind.AGREE, 1), - (aliased_reaction.kind == ReactionKind.DISAGREE, -1), - (aliased_reaction.kind == ReactionKind.PROOF, 1), - (aliased_reaction.kind == ReactionKind.DISPROOF, -1), - (aliased_reaction.kind == ReactionKind.ACCEPT, 1), - (aliased_reaction.kind == ReactionKind.REJECT, -1), - (aliased_reaction.kind == ReactionKind.LIKE, 1), - (aliased_reaction.kind == ReactionKind.DISLIKE, -1), - else_=0) - ).label('rating_stat'), - func.max(case( - (aliased_reaction.kind != ReactionKind.COMMENT, None), - else_=aliased_reaction.createdAt - )).label('last_comment')) + ).label("rating_stat"), + func.max( + case( + (aliased_reaction.kind != ReactionKind.COMMENT, None), + else_=aliased_reaction.createdAt, + ) + ).label("last_comment"), + ) return q @@ -60,7 +60,7 @@ def apply_filters(q, filters, user_id=None): if filters.get("layout"): q = q.filter(Shout.layout == filters.get("layout")) - if filters.get('excludeLayout'): + if filters.get("excludeLayout"): q = q.filter(Shout.layout != filters.get("excludeLayout")) if filters.get("author"): q = q.filter(Shout.authors.any(slug=filters.get("author"))) @@ -71,7 +71,9 @@ def apply_filters(q, filters, user_id=None): if filters.get("body"): q = q.filter(Shout.body.ilike(f'%{filters.get("body")}%s')) if filters.get("days"): - before = datetime.now(tz=timezone.utc) - timedelta(days=int(filters.get("days")) or 30) + before = datetime.now(tz=timezone.utc) - timedelta( + days=int(filters.get("days")) or 30 + ) q = q.filter(Shout.createdAt > before) return q @@ -87,30 +89,32 @@ async def load_shout(_, info, slug=None, shout_id=None): q = add_stat_columns(q) if slug is not None: - q = q.filter( - Shout.slug == slug - ) + q = q.filter(Shout.slug == slug) if shout_id is not None: - q = q.filter( - Shout.id == shout_id - ) + q = q.filter(Shout.id == shout_id) - q = q.filter( - Shout.deletedAt.is_(None) - ).group_by(Shout.id) + q = q.filter(Shout.deletedAt.is_(None)).group_by(Shout.id) try: - [shout, reacted_stat, commented_stat, rating_stat, last_comment] = session.execute(q).first() + [ + shout, + reacted_stat, + commented_stat, + rating_stat, + last_comment, + ] = session.execute(q).first() shout.stat = { "viewed": shout.views, "reacted": reacted_stat, "commented": commented_stat, - "rating": rating_stat + "rating": rating_stat, } - for author_caption in session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug): + for author_caption in ( + session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug) + ): for author in shout.authors: if author.id == author_caption.user: author.caption = author_caption.caption @@ -142,14 +146,13 @@ async def load_shouts_by(_, info, options): :return: Shout[] """ - q = select(Shout).options( - joinedload(Shout.authors), - joinedload(Shout.topics), - ).where( - and_( - Shout.deletedAt.is_(None), - Shout.layout.is_not(None) + q = ( + select(Shout) + .options( + joinedload(Shout.authors), + joinedload(Shout.topics), ) + .where(and_(Shout.deletedAt.is_(None), Shout.layout.is_not(None))) ) q = add_stat_columns(q) @@ -159,23 +162,36 @@ async def load_shouts_by(_, info, options): order_by = options.get("order_by", Shout.publishedAt) - query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) + query_order_by = ( + desc(order_by) if options.get("order_by_desc", True) else asc(order_by) + ) offset = options.get("offset", 0) limit = options.get("limit", 10) - q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset) + q = ( + q.group_by(Shout.id) + .order_by(nulls_last(query_order_by)) + .limit(limit) + .offset(offset) + ) shouts = [] with local_session() as session: shouts_map = {} - for [shout, reacted_stat, commented_stat, rating_stat, last_comment] in session.execute(q).unique(): + for [ + shout, + reacted_stat, + commented_stat, + rating_stat, + last_comment, + ] in session.execute(q).unique(): shouts.append(shout) shout.stat = { "viewed": shout.views, "reacted": reacted_stat, "commented": commented_stat, - "rating": rating_stat + "rating": rating_stat, } shouts_map[shout.id] = shout @@ -187,11 +203,13 @@ async def get_drafts(_, info): auth: AuthCredentials = info.context["request"].auth user_id = auth.user_id - q = select(Shout).options( - joinedload(Shout.authors), - joinedload(Shout.topics), - ).where( - and_(Shout.deletedAt.is_(None), Shout.createdBy == user_id) + q = ( + select(Shout) + .options( + joinedload(Shout.authors), + joinedload(Shout.topics), + ) + .where(and_(Shout.deletedAt.is_(None), Shout.createdBy == user_id)) ) q = q.group_by(Shout.id) @@ -210,24 +228,26 @@ async def get_my_feed(_, info, options): auth: AuthCredentials = info.context["request"].auth user_id = auth.user_id - subquery = select(Shout.id).join( - ShoutAuthor - ).join( - AuthorFollower, AuthorFollower.follower == user_id - ).join( - ShoutTopic - ).join( - TopicFollower, TopicFollower.follower == user_id + subquery = ( + select(Shout.id) + .join(ShoutAuthor) + .join(AuthorFollower, AuthorFollower.follower == user_id) + .join(ShoutTopic) + .join(TopicFollower, TopicFollower.follower == user_id) ) - q = select(Shout).options( - joinedload(Shout.authors), - joinedload(Shout.topics), - ).where( - and_( - Shout.publishedAt.is_not(None), - Shout.deletedAt.is_(None), - Shout.id.in_(subquery) + q = ( + select(Shout) + .options( + joinedload(Shout.authors), + joinedload(Shout.topics), + ) + .where( + and_( + Shout.publishedAt.is_not(None), + Shout.deletedAt.is_(None), + Shout.id.in_(subquery), + ) ) ) @@ -236,22 +256,35 @@ async def get_my_feed(_, info, options): order_by = options.get("order_by", Shout.publishedAt) - query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by) + query_order_by = ( + desc(order_by) if options.get("order_by_desc", True) else asc(order_by) + ) offset = options.get("offset", 0) limit = options.get("limit", 10) - q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset) + q = ( + q.group_by(Shout.id) + .order_by(nulls_last(query_order_by)) + .limit(limit) + .offset(offset) + ) shouts = [] with local_session() as session: shouts_map = {} - for [shout, reacted_stat, commented_stat, rating_stat, last_comment] in session.execute(q).unique(): + for [ + shout, + reacted_stat, + commented_stat, + rating_stat, + last_comment, + ] in session.execute(q).unique(): shouts.append(shout) shout.stat = { "viewed": shout.views, "reacted": reacted_stat, "commented": commented_stat, - "rating": rating_stat + "rating": rating_stat, } shouts_map[shout.id] = shout diff --git a/resolvers/create/migrate.py b/resolvers/migrate.py similarity index 86% rename from resolvers/create/migrate.py rename to resolvers/migrate.py index f16341f0..71314b0a 100644 --- a/resolvers/create/migrate.py +++ b/resolvers/migrate.py @@ -1,5 +1,4 @@ - -from base.resolvers import query +from services.schema import query from resolvers.auth import login_required from migration.extract import extract_md diff --git a/resolvers/zine/profile.py b/resolvers/profile.py similarity index 74% rename from resolvers/zine/profile.py rename to resolvers/profile.py index 2856553b..57331999 100644 --- a/resolvers/zine/profile.py +++ b/resolvers/profile.py @@ -5,8 +5,8 @@ from sqlalchemy.orm import aliased, joinedload from auth.authenticate import login_required from auth.credentials import AuthCredentials -from base.orm import local_session -from base.resolvers import mutation, query +from services.orm import local_session +from services.schema import mutation, query from orm.reaction import Reaction from orm.shout import ShoutAuthor, ShoutTopic from orm.topic import Topic @@ -23,36 +23,31 @@ def add_author_stat_columns(q, include_heavy_stat=False): shout_author_aliased = aliased(ShoutAuthor) q = q.outerjoin(shout_author_aliased).add_columns( - func.count(distinct(shout_author_aliased.shout)).label('shouts_stat') + func.count(distinct(shout_author_aliased.shout)).label("shouts_stat") ) q = q.outerjoin(author_followers, author_followers.author == User.id).add_columns( - func.count(distinct(author_followers.follower)).label('followers_stat') + func.count(distinct(author_followers.follower)).label("followers_stat") ) q = q.outerjoin(author_following, author_following.follower == User.id).add_columns( - func.count(distinct(author_following.author)).label('followings_stat') + func.count(distinct(author_following.author)).label("followings_stat") ) if include_heavy_stat: user_rating_aliased = aliased(UserRating) - q = q.outerjoin(user_rating_aliased, user_rating_aliased.user == User.id).add_columns( - func.sum(user_rating_aliased.value).label('rating_stat') - ) + q = q.outerjoin( + user_rating_aliased, user_rating_aliased.user == User.id + ).add_columns(func.sum(user_rating_aliased.value).label("rating_stat")) else: - q = q.add_columns(literal(-1).label('rating_stat')) + q = q.add_columns(literal(-1).label("rating_stat")) if include_heavy_stat: q = q.outerjoin( - Reaction, - and_( - Reaction.createdBy == User.id, - Reaction.body.is_not(None) - )).add_columns( - func.count(distinct(Reaction.id)).label('commented_stat') - ) + Reaction, and_(Reaction.createdBy == User.id, Reaction.body.is_not(None)) + ).add_columns(func.count(distinct(Reaction.id)).label("commented_stat")) else: - q = q.add_columns(literal(-1).label('commented_stat')) + q = q.add_columns(literal(-1).label("commented_stat")) q = q.group_by(User.id) @@ -60,13 +55,19 @@ def add_author_stat_columns(q, include_heavy_stat=False): def add_stat(author, stat_columns): - [shouts_stat, followers_stat, followings_stat, rating_stat, commented_stat] = stat_columns + [ + shouts_stat, + followers_stat, + followings_stat, + rating_stat, + commented_stat, + ] = stat_columns author.stat = { "shouts": shouts_stat, "followers": followers_stat, "followings": followings_stat, "rating": rating_stat, - "commented": commented_stat + "commented": commented_stat, } return author @@ -84,9 +85,15 @@ def get_authors_from_query(q): async def user_subscriptions(user_id: int): return { - "unread": await get_total_unread_counter(user_id), # unread inbox messages counter - "topics": [t.slug for t in await followed_topics(user_id)], # followed topics slugs - "authors": [a.slug for a in await followed_authors(user_id)], # followed authors slugs + "unread": await get_total_unread_counter( + user_id + ), # unread inbox messages counter + "topics": [ + t.slug for t in await followed_topics(user_id) + ], # followed topics slugs + "authors": [ + a.slug for a in await followed_authors(user_id) + ], # followed authors slugs "reactions": await followed_reactions(user_id) # "communities": [c.slug for c in followed_communities(slug)], # communities } @@ -101,13 +108,12 @@ async def followed_discussions(_, info, user_id) -> List[Topic]: async def followed_reactions(user_id): with local_session() as session: user = session.query(User).where(User.id == user_id).first() - return session.query( - Reaction.shout - ).where( - Reaction.createdBy == user.id - ).filter( - Reaction.createdAt > user.lastSeen - ).all() + return ( + session.query(Reaction.shout) + .where(Reaction.createdBy == user.id) + .filter(Reaction.createdAt > user.lastSeen) + .all() + ) # dufok mod (^*^') : @@ -158,10 +164,10 @@ async def user_followers(_, _info, slug) -> List[User]: q = add_author_stat_columns(q) aliased_user = aliased(User) - q = q.join(AuthorFollower, AuthorFollower.follower == User.id).join( - aliased_user, aliased_user.id == AuthorFollower.author - ).where( - aliased_user.slug == slug + q = ( + q.join(AuthorFollower, AuthorFollower.follower == User.id) + .join(aliased_user, aliased_user.id == AuthorFollower.author) + .where(aliased_user.slug == slug) ) return get_authors_from_query(q) @@ -189,15 +195,10 @@ async def update_profile(_, info, profile): with local_session() as session: user = session.query(User).filter(User.id == user_id).one() if not user: - return { - "error": "canoot find user" - } + return {"error": "canoot find user"} user.update(profile) session.commit() - return { - "error": None, - "author": user - } + return {"error": None, "author": user} @mutation.field("rateUser") @@ -208,7 +209,11 @@ async def rate_user(_, info, rated_userslug, value): with local_session() as session: rating = ( session.query(UserRating) - .filter(and_(UserRating.rater == auth.user_id, UserRating.user == rated_userslug)) + .filter( + and_( + UserRating.rater == auth.user_id, UserRating.user == rated_userslug + ) + ) .first() ) if rating: @@ -239,13 +244,10 @@ def author_follow(user_id, slug): def author_unfollow(user_id, slug): with local_session() as session: flw = ( - session.query( - AuthorFollower - ).join(User, User.id == AuthorFollower.author).filter( - and_( - AuthorFollower.follower == user_id, User.slug == slug - ) - ).first() + session.query(AuthorFollower) + .join(User, User.id == AuthorFollower.author) + .filter(and_(AuthorFollower.follower == user_id, User.slug == slug)) + .first() ) if flw: session.delete(flw) @@ -281,7 +283,12 @@ async def load_authors_by(_, info, by, limit, offset): elif by.get("name"): q = q.filter(User.name.ilike(f"%{by['name']}%")) elif by.get("topic"): - q = q.join(ShoutAuthor).join(ShoutTopic).join(Topic).where(Topic.slug == by["topic"]) + q = ( + q.join(ShoutAuthor) + .join(ShoutTopic) + .join(Topic) + .where(Topic.slug == by["topic"]) + ) if by.get("lastSeen"): # in days days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["lastSeen"]) q = q.filter(User.lastSeen > days_before) @@ -289,8 +296,6 @@ async def load_authors_by(_, info, by, limit, offset): days_before = datetime.now(tz=timezone.utc) - timedelta(days=by["createdAt"]) q = q.filter(User.createdAt > days_before) - q = q.order_by( - by.get("order", User.createdAt) - ).limit(limit).offset(offset) + q = q.order_by(by.get("order", User.createdAt)).limit(limit).offset(offset) return get_authors_from_query(q) diff --git a/resolvers/zine/reactions.py b/resolvers/reactions.py similarity index 63% rename from resolvers/zine/reactions.py rename to resolvers/reactions.py index 9ee2f098..8e7fec21 100644 --- a/resolvers/zine/reactions.py +++ b/resolvers/reactions.py @@ -4,9 +4,9 @@ from sqlalchemy.orm import aliased from auth.authenticate import login_required from auth.credentials import AuthCredentials -from base.exceptions import OperationNotAllowed -from base.orm import local_session -from base.resolvers import mutation, query +from services.exceptions import OperationNotAllowed +from services.db import local_session +from services.schema import mutation, query from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutReactionsFollower from orm.user import User @@ -15,27 +15,27 @@ from orm.user import User def add_reaction_stat_columns(q): aliased_reaction = aliased(Reaction) - q = q.outerjoin(aliased_reaction, Reaction.id == aliased_reaction.replyTo).add_columns( - func.sum( - aliased_reaction.id - ).label('reacted_stat'), + q = q.outerjoin( + aliased_reaction, Reaction.id == aliased_reaction.replyTo + ).add_columns( + func.sum(aliased_reaction.id).label("reacted_stat"), + func.sum(case((aliased_reaction.body.is_not(None), 1), else_=0)).label( + "commented_stat" + ), func.sum( case( - (aliased_reaction.body.is_not(None), 1), - else_=0 + (aliased_reaction.kind == ReactionKind.AGREE, 1), + (aliased_reaction.kind == ReactionKind.DISAGREE, -1), + (aliased_reaction.kind == ReactionKind.PROOF, 1), + (aliased_reaction.kind == ReactionKind.DISPROOF, -1), + (aliased_reaction.kind == ReactionKind.ACCEPT, 1), + (aliased_reaction.kind == ReactionKind.REJECT, -1), + (aliased_reaction.kind == ReactionKind.LIKE, 1), + (aliased_reaction.kind == ReactionKind.DISLIKE, -1), + else_=0, ) - ).label('commented_stat'), - func.sum(case( - (aliased_reaction.kind == ReactionKind.AGREE, 1), - (aliased_reaction.kind == ReactionKind.DISAGREE, -1), - (aliased_reaction.kind == ReactionKind.PROOF, 1), - (aliased_reaction.kind == ReactionKind.DISPROOF, -1), - (aliased_reaction.kind == ReactionKind.ACCEPT, 1), - (aliased_reaction.kind == ReactionKind.REJECT, -1), - (aliased_reaction.kind == ReactionKind.LIKE, 1), - (aliased_reaction.kind == ReactionKind.DISLIKE, -1), - else_=0) - ).label('rating_stat')) + ).label("rating_stat"), + ) return q @@ -46,17 +46,19 @@ def reactions_follow(user_id, shout_id: int, auto=False): shout = session.query(Shout).where(Shout.id == shout_id).one() following = ( - session.query(ShoutReactionsFollower).where(and_( - ShoutReactionsFollower.follower == user_id, - ShoutReactionsFollower.shout == shout.id, - )).first() + session.query(ShoutReactionsFollower) + .where( + and_( + ShoutReactionsFollower.follower == user_id, + ShoutReactionsFollower.shout == shout.id, + ) + ) + .first() ) if not following: following = ShoutReactionsFollower.create( - follower=user_id, - shout=shout.id, - auto=auto + follower=user_id, shout=shout.id, auto=auto ) session.add(following) session.commit() @@ -71,10 +73,14 @@ def reactions_unfollow(user_id: int, shout_id: int): shout = session.query(Shout).where(Shout.id == shout_id).one() following = ( - session.query(ShoutReactionsFollower).where(and_( - ShoutReactionsFollower.follower == user_id, - ShoutReactionsFollower.shout == shout.id - )).first() + session.query(ShoutReactionsFollower) + .where( + and_( + ShoutReactionsFollower.follower == user_id, + ShoutReactionsFollower.shout == shout.id, + ) + ) + .first() ) if following: @@ -87,30 +93,31 @@ def reactions_unfollow(user_id: int, shout_id: int): def is_published_author(session, user_id): - ''' checks if user has at least one publication ''' - return session.query( - Shout - ).where( - Shout.authors.contains(user_id) - ).filter( - and_( - Shout.publishedAt.is_not(None), - Shout.deletedAt.is_(None) - ) - ).count() > 0 + """checks if user has at least one publication""" + return ( + session.query(Shout) + .where(Shout.authors.contains(user_id)) + .filter(and_(Shout.publishedAt.is_not(None), Shout.deletedAt.is_(None))) + .count() + > 0 + ) def check_to_publish(session, user_id, reaction): - ''' set shout to public if publicated approvers amount > 4 ''' + """set shout to public if publicated approvers amount > 4""" if not reaction.replyTo and reaction.kind in [ ReactionKind.ACCEPT, ReactionKind.LIKE, - ReactionKind.PROOF + ReactionKind.PROOF, ]: if is_published_author(user_id): # now count how many approvers are voted already - approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() - approvers = [user_id, ] + approvers_reactions = ( + session.query(Reaction).where(Reaction.shout == reaction.shout).all() + ) + approvers = [ + user_id, + ] for ar in approvers_reactions: a = ar.createdBy if is_published_author(session, a): @@ -121,20 +128,22 @@ def check_to_publish(session, user_id, reaction): def check_to_hide(session, user_id, reaction): - ''' hides any shout if 20% of reactions are negative ''' + """hides any shout if 20% of reactions are negative""" if not reaction.replyTo and reaction.kind in [ ReactionKind.REJECT, ReactionKind.DISLIKE, - ReactionKind.DISPROOF + ReactionKind.DISPROOF, ]: # if is_published_author(user): - approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() + approvers_reactions = ( + session.query(Reaction).where(Reaction.shout == reaction.shout).all() + ) rejects = 0 for r in approvers_reactions: if r.kind in [ ReactionKind.REJECT, ReactionKind.DISLIKE, - ReactionKind.DISPROOF + ReactionKind.DISPROOF, ]: rejects += 1 if len(approvers_reactions) / rejects < 5: @@ -145,14 +154,14 @@ def check_to_hide(session, user_id, reaction): def set_published(session, shout_id): s = session.query(Shout).where(Shout.id == shout_id).first() s.publishedAt = datetime.now(tz=timezone.utc) - s.visibility = text('public') + s.visibility = text("public") session.add(s) session.commit() def set_hidden(session, shout_id): s = session.query(Shout).where(Shout.id == shout_id).first() - s.visibility = text('community') + s.visibility = text("community") session.add(s) session.commit() @@ -161,37 +170,46 @@ def set_hidden(session, shout_id): @login_required async def create_reaction(_, info, reaction): auth: AuthCredentials = info.context["request"].auth - reaction['createdBy'] = auth.user_id + reaction["createdBy"] = auth.user_id rdict = {} with local_session() as session: shout = session.query(Shout).where(Shout.id == reaction["shout"]).one() author = session.query(User).where(User.id == auth.user_id).one() - if reaction["kind"] in [ - ReactionKind.DISLIKE.name, - ReactionKind.LIKE.name - ]: - existing_reaction = session.query(Reaction).where( - and_( - Reaction.shout == reaction["shout"], - Reaction.createdBy == auth.user_id, - Reaction.kind == reaction["kind"], - Reaction.replyTo == reaction.get("replyTo") + if reaction["kind"] in [ReactionKind.DISLIKE.name, ReactionKind.LIKE.name]: + existing_reaction = ( + session.query(Reaction) + .where( + and_( + Reaction.shout == reaction["shout"], + Reaction.createdBy == auth.user_id, + Reaction.kind == reaction["kind"], + Reaction.replyTo == reaction.get("replyTo"), + ) ) - ).first() + .first() + ) if existing_reaction is not None: raise OperationNotAllowed("You can't vote twice") - opposite_reaction_kind = ReactionKind.DISLIKE if reaction["kind"] == ReactionKind.LIKE.name else ReactionKind.LIKE - opposite_reaction = session.query(Reaction).where( + opposite_reaction_kind = ( + ReactionKind.DISLIKE + if reaction["kind"] == ReactionKind.LIKE.name + else ReactionKind.LIKE + ) + opposite_reaction = ( + session.query(Reaction) + .where( and_( Reaction.shout == reaction["shout"], Reaction.createdBy == auth.user_id, Reaction.kind == opposite_reaction_kind, - Reaction.replyTo == reaction.get("replyTo") + Reaction.replyTo == reaction.get("replyTo"), ) - ).first() + ) + .first() + ) if opposite_reaction is not None: session.delete(opposite_reaction) @@ -199,14 +217,18 @@ async def create_reaction(_, info, reaction): r = Reaction.create(**reaction) # Proposal accepting logix - if r.replyTo is not None and \ - r.kind == ReactionKind.ACCEPT and \ - auth.user_id in shout.dict()['authors']: - replied_reaction = session.query(Reaction).where(Reaction.id == r.replyTo).first() + if ( + r.replyTo is not None + and r.kind == ReactionKind.ACCEPT + and auth.user_id in shout.dict()["authors"] + ): + replied_reaction = ( + session.query(Reaction).where(Reaction.id == r.replyTo).first() + ) if replied_reaction and replied_reaction.kind == ReactionKind.PROPOSE: if replied_reaction.range: old_body = shout.body - start, end = replied_reaction.range.split(':') + start, end = replied_reaction.range.split(":") start = int(start) end = int(end) new_body = old_body[:start] + replied_reaction.body + old_body[end:] @@ -216,8 +238,8 @@ async def create_reaction(_, info, reaction): session.add(r) session.commit() rdict = r.dict() - rdict['shout'] = shout.dict() - rdict['createdBy'] = author.dict() + rdict["shout"] = shout.dict() + rdict["createdBy"] = author.dict() # self-regulation mechanics @@ -231,11 +253,7 @@ async def create_reaction(_, info, reaction): except Exception as e: print(f"[resolvers.reactions] error on reactions autofollowing: {e}") - rdict['stat'] = { - "commented": 0, - "reacted": 0, - "rating": 0 - } + rdict["stat"] = {"commented": 0, "reacted": 0, "rating": 0} return {"reaction": rdict} @@ -250,7 +268,9 @@ async def update_reaction(_, info, id, reaction={}): q = add_reaction_stat_columns(q) q = q.group_by(Reaction.id) - [r, reacted_stat, commented_stat, rating_stat] = session.execute(q).unique().one() + [r, reacted_stat, commented_stat, rating_stat] = ( + session.execute(q).unique().one() + ) if not r: return {"error": "invalid reaction id"} @@ -268,7 +288,7 @@ async def update_reaction(_, info, id, reaction={}): r.stat = { "commented": commented_stat, "reacted": reacted_stat, - "rating": rating_stat + "rating": rating_stat, } return {"reaction": r} @@ -286,17 +306,12 @@ async def delete_reaction(_, info, id): if r.createdBy != auth.user_id: return {"error": "access denied"} - if r.kind in [ - ReactionKind.LIKE, - ReactionKind.DISLIKE - ]: + if r.kind in [ReactionKind.LIKE, ReactionKind.DISLIKE]: session.delete(r) else: r.deletedAt = datetime.now(tz=timezone.utc) session.commit() - return { - "reaction": r - } + return {"reaction": r} @query.field("loadReactionsBy") @@ -317,12 +332,10 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): :return: Reaction[] """ - q = select( - Reaction, User, Shout - ).join( - User, Reaction.createdBy == User.id - ).join( - Shout, Reaction.shout == Shout.id + q = ( + select(Reaction, User, Shout) + .join(User, Reaction.createdBy == User.id) + .join(Shout, Reaction.shout == Shout.id) ) if by.get("shout"): @@ -340,7 +353,7 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): if by.get("comment"): q = q.filter(func.length(Reaction.body) > 0) - if len(by.get('search', '')) > 2: + if len(by.get("search", "")) > 2: q = q.filter(Reaction.body.ilike(f'%{by["body"]}%')) if by.get("days"): @@ -348,13 +361,9 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): q = q.filter(Reaction.createdAt > after) order_way = asc if by.get("sort", "").startswith("-") else desc - order_field = by.get("sort", "").replace('-', '') or Reaction.createdAt + order_field = by.get("sort", "").replace("-", "") or Reaction.createdAt - q = q.group_by( - Reaction.id, User.id, Shout.id - ).order_by( - order_way(order_field) - ) + q = q.group_by(Reaction.id, User.id, Shout.id).order_by(order_way(order_field)) q = add_reaction_stat_columns(q) @@ -363,13 +372,20 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): reactions = [] with local_session() as session: - for [reaction, user, shout, reacted_stat, commented_stat, rating_stat] in session.execute(q): + for [ + reaction, + user, + shout, + reacted_stat, + commented_stat, + rating_stat, + ] in session.execute(q): reaction.createdBy = user reaction.shout = shout reaction.stat = { "rating": rating_stat, "commented": commented_stat, - "reacted": reacted_stat + "reacted": reacted_stat, } reaction.kind = reaction.kind.name diff --git a/resolvers/zine/topics.py b/resolvers/topics.py similarity index 81% rename from resolvers/zine/topics.py rename to resolvers/topics.py index f354a7b4..cead9564 100644 --- a/resolvers/zine/topics.py +++ b/resolvers/topics.py @@ -2,8 +2,8 @@ from sqlalchemy import and_, select, distinct, func from sqlalchemy.orm import aliased from auth.authenticate import login_required -from base.orm import local_session -from base.resolvers import mutation, query +from services.db import local_session +from services.schema import mutation, query from orm.shout import ShoutTopic, ShoutAuthor from orm.topic import Topic, TopicFollower from orm import User @@ -13,12 +13,19 @@ def add_topic_stat_columns(q): aliased_shout_author = aliased(ShoutAuthor) aliased_topic_follower = aliased(TopicFollower) - q = q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic).add_columns( - func.count(distinct(ShoutTopic.shout)).label('shouts_stat') - ).outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout).add_columns( - func.count(distinct(aliased_shout_author.user)).label('authors_stat') - ).outerjoin(aliased_topic_follower).add_columns( - func.count(distinct(aliased_topic_follower.follower)).label('followers_stat') + q = ( + q.outerjoin(ShoutTopic, Topic.id == ShoutTopic.topic) + .add_columns(func.count(distinct(ShoutTopic.shout)).label("shouts_stat")) + .outerjoin(aliased_shout_author, ShoutTopic.shout == aliased_shout_author.shout) + .add_columns( + func.count(distinct(aliased_shout_author.user)).label("authors_stat") + ) + .outerjoin(aliased_topic_follower) + .add_columns( + func.count(distinct(aliased_topic_follower.follower)).label( + "followers_stat" + ) + ) ) q = q.group_by(Topic.id) @@ -31,7 +38,7 @@ def add_stat(topic, stat_columns): topic.stat = { "shouts": shouts_stat, "authors": authors_stat, - "followers": followers_stat + "followers": followers_stat, } return topic @@ -133,12 +140,10 @@ def topic_unfollow(user_id, slug): try: with local_session() as session: sub = ( - session.query(TopicFollower).join(Topic).filter( - and_( - TopicFollower.follower == user_id, - Topic.slug == slug - ) - ).first() + session.query(TopicFollower) + .join(Topic) + .filter(and_(TopicFollower.follower == user_id, Topic.slug == slug)) + .first() ) if sub: session.delete(sub) diff --git a/services/stat/ackee.graphql b/schemas/ackee.graphql similarity index 100% rename from services/stat/ackee.graphql rename to schemas/ackee.graphql diff --git a/schema.graphql b/schemas/core.graphql similarity index 79% rename from schema.graphql rename to schemas/core.graphql index 86f6f8c6..4115d29b 100644 --- a/schema.graphql +++ b/schemas/core.graphql @@ -2,12 +2,6 @@ scalar DateTime ################################### Payload ################################### -enum MessageStatus { - NEW - UPDATED - DELETED -} - type UserFollowings { unread: Int topics: [String] @@ -23,18 +17,6 @@ type AuthResult { news: UserFollowings } -type ChatMember { - id: Int! - slug: String! - name: String! - userpic: String - lastSeen: DateTime - online: Boolean - # invitedAt: DateTime - # invitedBy: String # user slug - # TODO: keep invite databit -} - type AuthorStat { followings: Int followers: Int @@ -62,11 +44,6 @@ type Author { type Result { error: String slugs: [String] - chat: Chat - chats: [Chat] - message: Message - messages: [Message] - members: [ChatMember] shout: Shout shouts: [Shout] author: Author @@ -140,12 +117,6 @@ input ReactionInput { replyTo: Int } -input ChatInput { - id: String! - title: String - description: String -} - enum FollowingEntity { TOPIC AUTHOR @@ -156,15 +127,6 @@ enum FollowingEntity { ################################### Mutation type Mutation { - # inbox - createChat(title: String, members: [Int]!): Result! - updateChat(chat: ChatInput!): Result! - deleteChat(chatId: String!): Result! - - createMessage(chat: String!, body: String!, replyTo: Int): Result! - updateMessage(chatId: String!, id: Int!, body: String!): Result! - deleteMessage(chatId: String!, id: Int!): Result! - markAsRead(chatId: String!, ids: [Int]!): Result! # auth getSession: AuthResult! @@ -198,15 +160,6 @@ type Mutation { unfollow(what: FollowingEntity!, slug: String!): Result! } -input MessagesBy { - author: String - body: String - chat: String - order: String - days: Int - stat: String -} - input AuthorsBy { lastSeen: DateTime createdAt: DateTime @@ -252,12 +205,6 @@ input ReactionBy { ################################### Query type Query { - # inbox - loadChats( limit: Int, offset: Int): Result! # your chats - loadMessagesBy(by: MessagesBy!, limit: Int, offset: Int): Result! - loadRecipients(limit: Int, offset: Int): Result! - searchRecipients(query: String!, limit: Int, offset: Int): Result! - searchMessages(by: MessagesBy!, limit: Int, offset: Int): Result! # auth isEmailUsed(email: String!): Boolean! @@ -288,14 +235,6 @@ type Query { topicsByAuthor(author: String!): [Topic]! } -############################################ Subscription - -type Subscription { - newMessage: Message # new messages in inbox - newShout: Shout # personal feed new shout - newReaction: Reaction # new reactions to notify -} - ############################################ Entities type Resource { @@ -474,29 +413,3 @@ type Token { usedAt: DateTime value: String! } - -type Message { - author: Int! - chatId: String! - body: String! - createdAt: Int! - id: Int! - replyTo: Int - updatedAt: Int - seen: Boolean -} - -type Chat { - id: String! - createdAt: Int! - createdBy: Int! - updatedAt: Int! - title: String - description: String - users: [Int] - members: [ChatMember] - admins: [Int] - messages: [Message] - unread: Int - private: Boolean -} diff --git a/base/orm.py b/services/db.py similarity index 100% rename from base/orm.py rename to services/db.py diff --git a/services/exceptions.py b/services/exceptions.py new file mode 100644 index 00000000..95511db2 --- /dev/null +++ b/services/exceptions.py @@ -0,0 +1,39 @@ +from starlette.exceptions import HTTPException + + +# TODO: remove traceback from logs for defined exceptions + + +class BaseHttpException(HTTPException): + states_code = 500 + detail = "500 Server error" + + +class ExpiredToken(BaseHttpException): + states_code = 401 + detail = "401 Expired Token" + + +class InvalidToken(BaseHttpException): + states_code = 401 + detail = "401 Invalid Token" + + +class Unauthorized(BaseHttpException): + states_code = 401 + detail = "401 Unauthorized" + + +class ObjectNotExist(BaseHttpException): + code = 404 + detail = "404 Object Does Not Exist" + + +class OperationNotAllowed(BaseHttpException): + states_code = 403 + detail = "403 Operation Is Not Allowed" + + +class InvalidPassword(BaseHttpException): + states_code = 403 + message = "403 Invalid Password" diff --git a/services/inbox/presence.py b/services/inbox/presence.py deleted file mode 100644 index 2815c998..00000000 --- a/services/inbox/presence.py +++ /dev/null @@ -1,46 +0,0 @@ -# from base.exceptions import Unauthorized -from auth.tokenstorage import SessionToken -from base.redis import redis - - -async def set_online_status(user_id, status): - if user_id: - if status: - await redis.execute("SADD", "users-online", user_id) - else: - await redis.execute("SREM", "users-online", user_id) - - -async def on_connect(req, params): - if not isinstance(params, dict): - req.scope["connection_params"] = {} - return - token = params.get('token') - if not token: - # raise Unauthorized("Please login") - return { - "error": "Please login first" - } - else: - payload = await SessionToken.verify(token) - if payload and payload.user_id: - req.scope["user_id"] = payload.user_id - await set_online_status(payload.user_id, True) - - -async def on_disconnect(req): - user_id = req.scope.get("user_id") - await set_online_status(user_id, False) - - -# FIXME: not used yet -def context_value(request): - context = {} - print(f"[inbox.presense] request debug: {request}") - if request.scope["type"] == "websocket": - # request is an instance of WebSocket - context.update(request.scope["connection_params"]) - else: - context["token"] = request.META.get("authorization") - - return context diff --git a/services/inbox/sse.py b/services/inbox/sse.py deleted file mode 100644 index a73af840..00000000 --- a/services/inbox/sse.py +++ /dev/null @@ -1,22 +0,0 @@ -from sse_starlette.sse import EventSourceResponse -from starlette.requests import Request -from graphql.type import GraphQLResolveInfo -from resolvers.inbox.messages import message_generator -# from base.exceptions import Unauthorized - -# https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md - - -async def sse_messages(request: Request): - print(f'[SSE] request\n{request}\n') - info = GraphQLResolveInfo() - info.context['request'] = request.scope - user_id = request.scope['user'].user_id - if user_id: - event_generator = await message_generator(None, info) - return EventSourceResponse(event_generator) - else: - # raise Unauthorized("Please login") - return { - "error": "Please login first" - } diff --git a/services/main.py b/services/main.py index 10301b86..51312033 100644 --- a/services/main.py +++ b/services/main.py @@ -1,13 +1,13 @@ from services.search import SearchService -from services.stat.viewed import ViewedStorage -from base.orm import local_session +from stat.viewed import ViewedStorage +from services.db import local_session async def storages_init(): with local_session() as session: - print('[main] initialize SearchService') + print("[main] initialize SearchService") await SearchService.init(session) - print('[main] SearchService initialized') - print('[main] initialize storages') + print("[main] SearchService initialized") + print("[main] initialize storages") await ViewedStorage.init() - print('[main] storages initialized') + print("[main] storages initialized") diff --git a/services/presence.py b/services/presence.py new file mode 100644 index 00000000..d378c859 --- /dev/null +++ b/services/presence.py @@ -0,0 +1,35 @@ +import json +from orm.reaction import Reaction +from orm.shout import Shout +from services.redis import redis + + +async def notify_reaction(reaction: Reaction): + channel_name = f"new_reaction" + data = {**reaction, "kind": f"new_reaction{reaction.kind}"} + try: + await redis.publish(channel_name, json.dumps(data)) + except Exception as e: + print(f"Failed to publish to channel {channel_name}: {e}") + + +async def notify_shout(shout: Shout): + channel_name = f"new_shout" + data = {**shout, "kind": "new_shout"} + try: + await redis.publish(channel_name, json.dumps(data)) + except Exception as e: + print(f"Failed to publish to channel {channel_name}: {e}") + + +async def notify_follower(follower_id: int, author_id: int): + channel_name = f"new_follower" + data = { + "follower_id": follower_id, + "author_id": author_id, + "kind": "new_follower", + } + try: + await redis.publish(channel_name, json.dumps(data)) + except Exception as e: + print(f"Failed to publish to channel {channel_name}: {e}") diff --git a/services/redis.py b/services/redis.py new file mode 100644 index 00000000..be7e3106 --- /dev/null +++ b/services/redis.py @@ -0,0 +1,58 @@ +import asyncio +import aredis +from settings import REDIS_URL + + +class RedisCache: + def __init__(self, uri=REDIS_URL): + self._uri: str = uri + self.pubsub_channels = [] + self._instance = None + + async def connect(self): + self._instance = aredis.StrictRedis.from_url(self._uri, decode_responses=True) + + async def disconnect(self): + self._instance.connection_pool.disconnect() + self._instance = None + + async def execute(self, command, *args, **kwargs): + while not self._instance: + await asyncio.sleep(1) + try: + print("[redis] " + command + " " + " ".join(args)) + return await self._instance.execute_command(command, *args, **kwargs) + except Exception: + pass + + async def subscribe(self, *channels): + if not self._instance: + await self.connect() + for channel in channels: + await self._instance.subscribe(channel) + self.pubsub_channels.append(channel) + + async def unsubscribe(self, *channels): + if not self._instance: + return + for channel in channels: + await self._instance.unsubscribe(channel) + self.pubsub_channels.remove(channel) + + async def publish(self, channel, data): + if not self._instance: + return + await self._instance.publish(channel, data) + + async def lrange(self, key, start, stop): + print(f"[redis] LRANGE {key} {start} {stop}") + return await self._instance.lrange(key, start, stop) + + async def mget(self, key, *keys): + print(f"[redis] MGET {key} {keys}") + return await self._instance.mget(key, *keys) + + +redis = RedisCache() + +__all__ = ["redis"] diff --git a/base/resolvers.py b/services/schema.py similarity index 100% rename from base/resolvers.py rename to services/schema.py diff --git a/services/search.py b/services/search.py index 834e5bf7..d2e29e9f 100644 --- a/services/search.py +++ b/services/search.py @@ -1,8 +1,8 @@ import asyncio import json -from base.redis import redis -from orm.shout import Shout -from resolvers.zine.load import load_shouts_by +from services.redis import redis +from db.shout import Shout +from schema.zine.load import load_shouts_by class SearchService: @@ -12,7 +12,7 @@ class SearchService: @staticmethod async def init(session): async with SearchService.lock: - print('[search.service] did nothing') + print("[search.service] did nothing") SearchService.cache = {} @staticmethod @@ -24,7 +24,7 @@ class SearchService: "title": text, "body": text, "limit": limit, - "offset": offset + "offset": offset, } payload = await load_shouts_by(None, None, options) await redis.execute("SET", text, json.dumps(payload)) diff --git a/services/stat/viewed.py b/services/viewed.py similarity index 76% rename from services/stat/viewed.py rename to services/viewed.py index 905ade43..21d688b7 100644 --- a/services/stat/viewed.py +++ b/services/viewed.py @@ -6,13 +6,13 @@ from ssl import create_default_context from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport -from sqlalchemy import func -from base.orm import local_session -from orm import User, Topic +from services.db import local_session +from orm import Topic from orm.shout import ShoutTopic, Shout -load_facts = gql(""" +load_facts = gql( + """ query getDomains { domains { id @@ -25,9 +25,11 @@ query getDomains { } } } -""") +""" +) -load_pages = gql(""" +load_pages = gql( + """ query getDomains { domains { title @@ -41,8 +43,9 @@ query getDomains { } } } -""") -schema_str = open(path.dirname(__file__) + '/ackee.graphql').read() +""" +) +schema_str = open(path.dirname(__file__) + "/ackee.graphql").read() token = environ.get("ACKEE_TOKEN", "") @@ -52,8 +55,8 @@ def create_client(headers=None, schema=None): transport=AIOHTTPTransport( url="https://ackee.discours.io/api", ssl=create_default_context(), - headers=headers - ) + headers=headers, + ), ) @@ -71,21 +74,24 @@ class ViewedStorage: @staticmethod async def init(): - """ graphql client connection using permanent token """ + """graphql client connection using permanent token""" self = ViewedStorage async with self.lock: if token: - self.client = create_client({ - "Authorization": "Bearer %s" % str(token) - }, schema=schema_str) - print("[stat.viewed] * authorized permanentely by ackee.discours.io: %s" % token) + self.client = create_client( + {"Authorization": "Bearer %s" % str(token)}, schema=schema_str + ) + print( + "[stat.viewed] * authorized permanentely by ackee.discours.io: %s" + % token + ) else: print("[stat.viewed] * please set ACKEE_TOKEN") self.disabled = True @staticmethod async def update_pages(): - """ query all the pages from ackee sorted by views count """ + """query all the pages from ackee sorted by views count""" print("[stat.viewed] ⎧ updating ackee pages data ---") start = time.time() self = ViewedStorage @@ -96,7 +102,7 @@ class ViewedStorage: try: for page in self.pages: p = page["value"].split("?")[0] - slug = p.split('discours.io/')[-1] + slug = p.split("discours.io/")[-1] shouts[slug] = page["count"] for slug in shouts.keys(): await ViewedStorage.increment(slug, shouts[slug]) @@ -118,7 +124,7 @@ class ViewedStorage: # unused yet @staticmethod async def get_shout(shout_slug): - """ getting shout views metric by slug """ + """getting shout views metric by slug""" self = ViewedStorage async with self.lock: shout_views = self.by_shouts.get(shout_slug) @@ -126,7 +132,9 @@ class ViewedStorage: shout_views = 0 with local_session() as session: try: - shout = session.query(Shout).where(Shout.slug == shout_slug).one() + shout = ( + session.query(Shout).where(Shout.slug == shout_slug).one() + ) self.by_shouts[shout_slug] = shout.views self.update_topics(session, shout_slug) except Exception as e: @@ -136,7 +144,7 @@ class ViewedStorage: @staticmethod async def get_topic(topic_slug): - """ getting topic views value summed """ + """getting topic views value summed""" self = ViewedStorage topic_views = 0 async with self.lock: @@ -146,24 +154,28 @@ class ViewedStorage: @staticmethod def update_topics(session, shout_slug): - """ updates topics counters by shout slug """ + """updates topics counters by shout slug""" self = ViewedStorage - for [shout_topic, topic] in session.query(ShoutTopic, Topic).join(Topic).join(Shout).where( - Shout.slug == shout_slug - ).all(): + for [shout_topic, topic] in ( + session.query(ShoutTopic, Topic) + .join(Topic) + .join(Shout) + .where(Shout.slug == shout_slug) + .all() + ): if not self.by_topics.get(topic.slug): self.by_topics[topic.slug] = {} self.by_topics[topic.slug][shout_slug] = self.by_shouts[shout_slug] @staticmethod - async def increment(shout_slug, amount=1, viewer='ackee'): - """ the only way to change views counter """ + async def increment(shout_slug, amount=1, viewer="ackee"): + """the only way to change views counter""" self = ViewedStorage async with self.lock: # TODO optimize, currenty we execute 1 DB transaction per shout with local_session() as session: shout = session.query(Shout).where(Shout.slug == shout_slug).one() - if viewer == 'old-discours': + if viewer == "old-discours": # this is needed for old db migration if shout.viewsOld == amount: print(f"viewsOld amount: {amount}") @@ -174,7 +186,9 @@ class ViewedStorage: if shout.viewsAckee == amount: print(f"viewsAckee amount: {amount}") else: - print(f"viewsAckee amount changed: {shout.viewsAckee} --> {amount}") + print( + f"viewsAckee amount changed: {shout.viewsAckee} --> {amount}" + ) shout.viewsAckee = amount session.commit() @@ -185,7 +199,7 @@ class ViewedStorage: @staticmethod async def worker(): - """ async task worker """ + """async task worker""" failed = 0 self = ViewedStorage if self.disabled: @@ -205,9 +219,10 @@ class ViewedStorage: if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) - print("[stat.viewed] ⎩ next update: %s" % ( - t.split("T")[0] + " " + t.split("T")[1].split(".")[0] - )) + print( + "[stat.viewed] ⎩ next update: %s" + % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]) + ) await asyncio.sleep(self.period) else: await asyncio.sleep(10) diff --git a/services/zine/gittask.py b/services/zine/gittask.py deleted file mode 100644 index 31e55025..00000000 --- a/services/zine/gittask.py +++ /dev/null @@ -1,70 +0,0 @@ -import asyncio -import subprocess -from pathlib import Path - -from settings import SHOUTS_REPO - - -class GitTask: - """every shout update use a new task""" - - queue = asyncio.Queue() - - def __init__(self, input, username, user_email, comment): - self.slug = input["slug"] - self.shout_body = input["body"] - self.username = username - self.user_email = user_email - self.comment = comment - - GitTask.queue.put_nowait(self) - - def init_repo(self): - repo_path = "%s" % (SHOUTS_REPO) - - Path(repo_path).mkdir() - - cmd = ( - "cd %s && git init && " - "git config user.name 'discours' && " - "git config user.email 'discours@discours.io' && " - "touch initial && git add initial && " - "git commit -m 'init repo'" % (repo_path) - ) - output = subprocess.check_output(cmd, shell=True) - print(output) - - def execute(self): - repo_path = "%s" % (SHOUTS_REPO) - - if not Path(repo_path).exists(): - self.init_repo() - - # cmd = "cd %s && git checkout master" % (repo_path) - # output = subprocess.check_output(cmd, shell=True) - # print(output) - - shout_filename = "%s.mdx" % (self.slug) - shout_full_filename = "%s/%s" % (repo_path, shout_filename) - with open(shout_full_filename, mode="w", encoding="utf-8") as shout_file: - shout_file.write(bytes(self.shout_body, "utf-8").decode("utf-8", "ignore")) - - author = "%s <%s>" % (self.username, self.user_email) - cmd = "cd %s && git add %s && git commit -m '%s' --author='%s'" % ( - repo_path, - shout_filename, - self.comment, - author, - ) - output = subprocess.check_output(cmd, shell=True) - print(output) - - @staticmethod - async def git_task_worker(): - print("[service.git] starting task worker") - while True: - task = await GitTask.queue.get() - try: - task.execute() - except Exception as err: - print("[service.git] worker error: %s" % (err))