From 6cb5061ce5dc426e505d3a65bbc592be7d3c23e0 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Thu, 21 Jul 2022 14:58:50 +0300 Subject: [PATCH] wip refactoring: reactions, storages isolated --- auth/authenticate.py | 13 +- auth/authorize.py | 6 +- auth/email.py | 2 +- auth/{token.py => jwtcodec.py} | 4 +- inbox_resolvers/inbox.py | 37 ++-- main.py | 20 +- migrate.py | 187 ++++++++++++----- migration/export.py | 9 +- migration/tables/comments.py | 106 +++++----- migration/tables/content_items.py | 55 ++--- migration/tables/users.py | 9 +- orm/__init__.py | 30 ++- orm/comment.py | 30 --- orm/community.py | 12 +- orm/notification.py | 6 +- orm/proposal.py | 33 --- orm/rbac.py | 37 +--- orm/reaction.py | 51 +++++ orm/shout.py | 330 ++---------------------------- orm/topic.py | 61 +----- orm/user.py | 63 +----- pyproject.toml | 2 +- resolvers/__init__.py | 55 +++-- resolvers/auth.py | 2 - resolvers/collab.py | 223 +------------------- resolvers/comments.py | 136 ------------ resolvers/community.py | 41 ++-- resolvers/editor.py | 11 +- resolvers/feed.py | 41 ++++ resolvers/profile.py | 320 +++++++++++------------------ resolvers/reactions.py | 150 ++++++++++++++ resolvers/topics.py | 34 ++- resolvers/zine.py | 296 +++------------------------ schema.graphql | 292 ++++++++++++-------------- storages/gittask.py | 62 ++++++ storages/reactions.py | 152 ++++++++++++++ storages/roles.py | 36 ++++ storages/shoutauthor.py | 42 ++++ storages/shoutscache.py | 150 ++++++++++++++ storages/topics.py | 57 ++++++ storages/topicstat.py | 85 ++++++++ storages/users.py | 43 ++++ storages/viewed.py | 122 +++++++++++ 43 files changed, 1674 insertions(+), 1779 deletions(-) rename auth/{token.py => jwtcodec.py} (97%) delete mode 100644 orm/comment.py delete mode 100644 orm/proposal.py create mode 100644 orm/reaction.py delete mode 100644 resolvers/comments.py create mode 100644 resolvers/feed.py create mode 100644 resolvers/reactions.py create mode 100644 storages/gittask.py create mode 100644 storages/reactions.py create mode 100644 storages/roles.py create mode 100644 storages/shoutauthor.py create mode 100644 storages/shoutscache.py create mode 100644 storages/topics.py create mode 100644 storages/topicstat.py create mode 100644 storages/users.py create mode 100644 storages/viewed.py diff --git a/auth/authenticate.py b/auth/authenticate.py index 86e5eb9a..a3b96017 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -9,10 +9,11 @@ from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection from auth.credentials import AuthCredentials, AuthUser -from auth.token import Token +from auth.jwtcodec import JWTCodec from auth.authorize import Authorize, TokenStorage from exceptions import InvalidToken, OperationNotAllowed -from orm import User, UserStorage +from orm.user import User +from storages.users import UserStorage from orm.base import local_session from redis import redis from settings import JWT_AUTH_HEADER, EMAIL_TOKEN_LIFE_SPAN @@ -32,9 +33,9 @@ class _Authenticate: token is of specified type """ try: - payload = Token.decode(token) + payload = JWTCodec.decode(token) except ExpiredSignatureError: - payload = Token.decode(token, verify_exp=False) + payload = JWTCodec.decode(token, verify_exp=False) if not await cls.exists(payload.user_id, token): raise InvalidToken("Login expired, please login again") if payload.device == "mobile": # noqa @@ -109,14 +110,14 @@ class ResetPassword: @staticmethod async def get_reset_token(user): exp = datetime.utcnow() + timedelta(seconds=EMAIL_TOKEN_LIFE_SPAN) - token = Token.encode(user, exp=exp, device="pc") + token = JWTCodec.encode(user, exp=exp, device="pc") await TokenStorage.save(f"{user.id}-reset-{token}", EMAIL_TOKEN_LIFE_SPAN, True) return token @staticmethod async def verify(token): try: - payload = Token.decode(token) + payload = JWTCodec.decode(token) except ExpiredSignatureError: raise InvalidToken("Login expired, please login again") except DecodeError as e: diff --git a/auth/authorize.py b/auth/authorize.py index 5bc8f8a7..3bcbf7be 100644 --- a/auth/authorize.py +++ b/auth/authorize.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from auth.token import Token +from auth.jwtcodec import JWTCodec from redis import redis from settings import JWT_LIFE_SPAN from auth.validations import User @@ -22,14 +22,14 @@ class Authorize: @staticmethod async def authorize(user: User, device: str = "pc", life_span = JWT_LIFE_SPAN, auto_delete=True) -> str: exp = datetime.utcnow() + timedelta(seconds=life_span) - token = Token.encode(user, exp=exp, device=device) + token = JWTCodec.encode(user, exp=exp, device=device) await TokenStorage.save(f"{user.id}-{token}", life_span, auto_delete) return token @staticmethod async def revoke(token: str) -> bool: try: - payload = Token.decode(token) + payload = JWTCodec.decode(token) except: # noqa pass else: diff --git a/auth/email.py b/auth/email.py index 5be246e1..7a586daf 100644 --- a/auth/email.py +++ b/auth/email.py @@ -19,7 +19,7 @@ def load_email_templates(): filename = "templates/%s.tmpl" % name with open(filename) as f: email_templates[name] = f.read() - print("[email.service] templates loaded") + print("[auth.email] templates loaded") async def send_confirm_email(user): text = email_templates["confirm_email"] diff --git a/auth/token.py b/auth/jwtcodec.py similarity index 97% rename from auth/token.py rename to auth/jwtcodec.py index e71db18c..30b86dfe 100644 --- a/auth/token.py +++ b/auth/jwtcodec.py @@ -1,12 +1,10 @@ from datetime import datetime - import jwt - from settings import JWT_ALGORITHM, JWT_SECRET_KEY from auth.validations import PayLoad, User -class Token: +class JWTCodec: @staticmethod def encode(user: User, exp: datetime, device: str = "pc") -> str: payload = {"user_id": user.id, "device": device, "exp": exp, "iat": datetime.utcnow()} diff --git a/inbox_resolvers/inbox.py b/inbox_resolvers/inbox.py index 7844579d..8e6e791d 100644 --- a/inbox_resolvers/inbox.py +++ b/inbox_resolvers/inbox.py @@ -1,6 +1,3 @@ -from orm import User -from orm.base import local_session - from resolvers_base import mutation, query, subscription from auth.authenticate import login_required @@ -10,7 +7,7 @@ from datetime import datetime from redis import redis -class MessageSubscription: +class ChatFollowing: queue = asyncio.Queue() def __init__(self, chat_id): @@ -18,42 +15,42 @@ class MessageSubscription: class MessagesStorage: lock = asyncio.Lock() - subscriptions = [] + chats = [] @staticmethod - async def register_subscription(subs): + async def register_chat(chat): async with MessagesStorage.lock: - MessagesStorage.subscriptions.append(subs) + MessagesStorage.chats.append(chat) @staticmethod - async def del_subscription(subs): + async def remove_chat(chat): async with MessagesStorage.lock: - MessagesStorage.subscriptions.remove(subs) + MessagesStorage.chats.remove(chat) @staticmethod async def put(message_result): async with MessagesStorage.lock: - for subs in MessagesStorage.subscriptions: - if message_result.message["chatId"] == subs.chat_id: - subs.queue.put_nowait(message_result) + for chat in MessagesStorage.chats: + if message_result.message["chatId"] == chat.chat_id: + chat.queue.put_nowait(message_result) class MessageResult: def __init__(self, status, message): self.status = status self.message = message -async def get_total_unread_messages_for_user(user_slug): +async def get_inbox_counter(user_slug): chats = await redis.execute("GET", f"chats_by_user/{user_slug}") if not chats: return 0 chats = json.loads(chats) - total = 0 + unread = 0 for chat_id in chats: n = await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}") - total += n + unread += n - return total + return unread async def add_user_to_chat(user_slug, chat_id, chat = None): chats = await redis.execute("GET", f"chats_by_user/{user_slug}") @@ -264,13 +261,13 @@ async def message_generator(obj, info, chatId): # yield {"error" : auth.error_message or "Please login"} try: - subs = MessageSubscription(chatId) - await MessagesStorage.register_subscription(subs) + following_chat = ChatFollowing(chatId) + await MessagesStorage.register_chat(following_chat) while True: - msg = await subs.queue.get() + msg = await following_chat.queue.get() yield msg finally: - await MessagesStorage.del_subscription(subs) + await MessagesStorage.remove_chat(following_chat) @subscription.field("chatUpdated") def message_resolver(message, info, chatId): diff --git a/main.py b/main.py index 34ef8948..bafb1d4f 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,4 @@ from importlib import import_module - from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL from starlette.applications import Starlette @@ -7,16 +6,17 @@ from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.middleware.sessions import SessionMiddleware from starlette.routing import Route - from auth.authenticate import JWTAuthenticate from auth.oauth import oauth_login, oauth_authorize from auth.email import email_authorize from redis import redis from resolvers.base import resolvers -from resolvers.zine import GitTask, ShoutsCache - -from orm.shout import ShoutViewStorage, TopicStat, ShoutAuthorStorage, CommentStat - +from resolvers.zine import ShoutsCache +from storages.viewed import ViewedStorage +# from storages.gittask import GitTask +from storages.topicstat import TopicStat +from storages.shoutauthor import ShoutAuthorStorage +from storages.reactions import ReactionsStorage import asyncio import_module('resolvers') @@ -29,18 +29,18 @@ middleware = [ async def start_up(): await redis.connect() - git_task = asyncio.create_task(GitTask.git_task_worker()) + viewed_storage_task = asyncio.create_task(ViewedStorage.worker()) shouts_cache_task = asyncio.create_task(ShoutsCache.worker()) - view_storage_task = asyncio.create_task(ShoutViewStorage.worker()) + reaction_stat_task = asyncio.create_task(ReactionsStorage.worker()) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) topic_stat_task = asyncio.create_task(TopicStat.worker()) - comment_stat_task = asyncio.create_task(CommentStat.worker()) + # FIXME git_task = asyncio.create_task(GitTask.git_task_worker()) async def shutdown(): await redis.disconnect() routes = [ - Route("/oauth/{provider}", endpoint=oauth_login), + Route("/oauth/{provider}", endpoint=oauth_login), # TODO: isolate auth microservice Route("/oauth_authorize", endpoint=oauth_authorize), Route("/email_authorize", endpoint=email_authorize) ] diff --git a/migrate.py b/migrate.py index baa00e87..4c0d8e7f 100644 --- a/migrate.py +++ b/migrate.py @@ -1,33 +1,40 @@ ''' cmd managed migration ''' +from datetime import datetime import json -from migration.export import export_email_subscriptions, export_mdx, export_slug +import subprocess +import sys + +from click import prompt +# from migration.export import export_email_subscriptions +from migration.export import export_mdx, export_slug from migration.tables.users import migrate as migrateUser from migration.tables.users import migrate_2stage as migrateUser_2stage from migration.tables.content_items import get_shout_slug, migrate as migrateShout from migration.tables.topics import migrate as migrateTopic from migration.tables.comments import migrate as migrateComment from migration.tables.comments import migrate_2stage as migrateComment_2stage -from orm.base import local_session -from orm.community import Community -from orm.user import User +from orm.reaction import Reaction + +TODAY = datetime.strftime(datetime.now(), '%Y%m%d') OLD_DATE = '2016-03-05 22:22:00.350000' + def users_handle(storage): ''' migrating users first ''' counter = 0 id_map = {} - print('[migration] migrating %d users' %(len(storage['users']['data']))) + print('[migration] migrating %d users' % (len(storage['users']['data']))) for entry in storage['users']['data']: oid = entry['_id'] user = migrateUser(entry) - storage['users']['by_oid'][oid] = user # full + storage['users']['by_oid'][oid] = user # full del user['password'] del user['notifications'] del user['emailConfirmed'] del user['username'] del user['email'] - storage['users']['by_slug'][user['slug']] = user # public + storage['users']['by_slug'][user['slug']] = user # public id_map[user['oid']] = user['slug'] counter += 1 ce = 0 @@ -53,13 +60,16 @@ def topics_handle(storage): oid = storage['topics']['by_slug'][oldslug]['_id'] del storage['topics']['by_slug'][oldslug] storage['topics']['by_oid'][oid] = storage['topics']['by_slug'][newslug] - print( '[migration] ' + str(counter) + ' topics migrated') - print( '[migration] ' + str(len(storage['topics']['by_oid'].values())) + ' topics by oid' ) - print( '[migration] ' + str(len(storage['topics']['by_slug'].values())) + ' topics by slug' ) + print('[migration] ' + str(counter) + ' topics migrated') + print('[migration] ' + str(len(storage['topics'] + ['by_oid'].values())) + ' topics by oid') + print('[migration] ' + str(len(storage['topics'] + ['by_slug'].values())) + ' topics by slug') # raise Exception return storage -def shouts_handle(storage): + +def shouts_handle(storage, args): ''' migrating content items one by one ''' counter = 0 discours_author = 0 @@ -69,7 +79,7 @@ def shouts_handle(storage): slug = get_shout_slug(entry) # single slug mode - if '-' in sys.argv and slug not in sys.argv: continue + if '-' in args and slug not in args: continue # migrate shout = migrateShout(entry, storage) @@ -80,11 +90,11 @@ def shouts_handle(storage): # wuth author author = shout['authors'][0].slug - if author =='discours': discours_author += 1 + if author == 'discours': discours_author += 1 # print('[migration] ' + shout['slug'] + ' with author ' + author) if entry.get('published'): - if 'mdx' in sys.argv: export_mdx(shout) + if 'mdx' in args: export_mdx(shout) pub_counter += 1 # print main counter @@ -97,43 +107,57 @@ def shouts_handle(storage): print('[migration] ' + str(discours_author) + ' authored by @discours') return storage + def comments_handle(storage): id_map = {} ignored_counter = 0 - for oldcomment in storage['comments']['data']: - comment = migrateComment(oldcomment, storage) - if not comment: - print('[migration] comment ignored \n%r\n' % oldcomment) - ignored_counter += 1 - continue - id = comment.get('id') - oid = comment.get('oid') - id_map[oid] = id + missed_shouts = {} + for oldcomment in storage['reactions']['data']: + if not oldcomment.get('deleted'): + reaction = migrateComment(oldcomment, storage) + if type(reaction) == str: + missed_shouts[reaction] = oldcomment + elif type(reaction) == Reaction: + reaction = reaction.dict() + id = reaction['id'] + oid = reaction['oid'] + id_map[oid] = id + else: + ignored_counter += 1 - for comment in storage['comments']['data']: migrateComment_2stage(comment, id_map) + for reaction in storage['reactions']['data']: migrateComment_2stage( + reaction, id_map) print('[migration] ' + str(len(id_map)) + ' comments migrated') print('[migration] ' + str(ignored_counter) + ' comments ignored') + print('[migration] ' + str(len(missed_shouts.keys())) + + ' commented shouts missed') + missed_counter = 0 + for missed in missed_shouts.values(): + missed_counter += len(missed) + print('[migration] ' + str(missed_counter) + ' comments dropped') return storage - + def bson_handle(): # decode bson # preparing data from migration import bson2json bson2json.json_tables() + def export_one(slug, storage): topics_handle(storage) users_handle(storage) shouts_handle(storage) export_slug(slug, storage) -def all_handle(storage): - print('[migration] everything!') + +def all_handle(storage, args): + print('[migration] handle everything') users_handle(storage) topics_handle(storage) - shouts_handle(storage) + shouts_handle(storage, args) comments_handle(storage) - export_email_subscriptions() + # export_email_subscriptions() print('[migration] done!') @@ -148,7 +172,7 @@ def data_load(): 'by_slug': {}, 'data': [] }, - 'comments': { + 'reactions': { 'by_oid': {}, 'by_slug': {}, 'by_content': {}, @@ -174,65 +198,116 @@ def data_load(): content_data = [] try: users_data = json.loads(open('migration/data/users.json').read()) - print('[migration] ' + str(len(users_data)) + ' users loaded') + print('[migration] ' + str(len(users_data)) + ' users ') tags_data = json.loads(open('migration/data/tags.json').read()) storage['topics']['tags'] = tags_data - print('[migration] ' + str(len(tags_data)) + ' tags loaded') - cats_data = json.loads(open('migration/data/content_item_categories.json').read()) + print('[migration] ' + str(len(tags_data)) + ' tags ') + cats_data = json.loads( + open('migration/data/content_item_categories.json').read()) storage['topics']['cats'] = cats_data - print('[migration] ' + str(len(cats_data)) + ' cats loaded') + print('[migration] ' + str(len(cats_data)) + ' cats ') comments_data = json.loads(open('migration/data/comments.json').read()) - storage['comments']['data'] = comments_data - print('[migration] ' + str(len(comments_data)) + ' comments loaded') + storage['reactions']['data'] = comments_data + print('[migration] ' + str(len(comments_data)) + ' comments ') content_data = json.loads(open('migration/data/content_items.json').read()) storage['shouts']['data'] = content_data - print('[migration] ' + str(len(content_data)) + ' content items loaded') + print('[migration] ' + str(len(content_data)) + ' content items ') # fill out storage - for x in users_data: + for x in users_data: storage['users']['by_oid'][x['_id']] = x - # storage['users']['by_slug'][x['slug']] = x + # storage['users']['by_slug'][x['slug']] = x # no user.slug yet - print('[migration] ' + str(len(storage['users']['by_oid'].keys())) + ' users by oid') - for x in tags_data: + print('[migration] ' + str(len(storage['users'] + ['by_oid'].keys())) + ' users by oid') + for x in tags_data: storage['topics']['by_oid'][x['_id']] = x storage['topics']['by_slug'][x['slug']] = x for x in cats_data: storage['topics']['by_oid'][x['_id']] = x storage['topics']['by_slug'][x['slug']] = x - print('[migration] ' + str(len(storage['topics']['by_slug'].keys())) + ' topics by slug') + print('[migration] ' + str(len(storage['topics'] + ['by_slug'].keys())) + ' topics by slug') for item in content_data: slug = get_shout_slug(item) storage['content_items']['by_slug'][slug] = item storage['content_items']['by_oid'][item['_id']] = item print('[migration] ' + str(len(content_data)) + ' content items') for x in comments_data: - storage['comments']['by_oid'][x['_id']] = x + storage['reactions']['by_oid'][x['_id']] = x cid = x['contentItem'] - storage['comments']['by_content'][cid] = x + storage['reactions']['by_content'][cid] = x ci = storage['content_items']['by_oid'].get(cid, {}) - if 'slug' in ci: storage['comments']['by_slug'][ci['slug']] = x - print('[migration] ' + str(len(storage['comments']['by_content'].keys())) + ' with comments') + if 'slug' in ci: storage['reactions']['by_slug'][ci['slug']] = x + print('[migration] ' + str(len(storage['reactions'] + ['by_content'].keys())) + ' with comments') except Exception as e: raise e storage['users']['data'] = users_data storage['topics']['tags'] = tags_data storage['topics']['cats'] = cats_data storage['shouts']['data'] = content_data - storage['comments']['data'] = comments_data + storage['reactions']['data'] = comments_data return storage -if __name__ == '__main__': + +def mongo_download(url): + print('[migration] mongodb url: ' + url) + open('migration/data/mongodb.url', 'w').write(url) + logname = 'migration/data/mongo-' + TODAY + '.log' + subprocess.call([ + 'mongodump', + '--uri', url, + '--forceTableScan', + ], open(logname, 'w')) + + +def create_pgdump(): + # pg_dump -d discoursio > 20220714-pgdump.sql + subprocess.Popen( + [ 'pg_dump', '-d', 'discoursio' ], + stdout=open('migration/data/' + TODAY + '-pgdump.log', 'w'), + stderr = subprocess.STDOUT + ) + # scp 20220714-pgdump.sql root@build.discours.io:/root/discours-backend/. + subprocess.call([ + 'scp', + 'migration/data/' + TODAY + '-pgdump.sql', + 'root@build.discours.io:/root/discours-backend/.' + ]) + print('[migration] pg_dump up') + + +def handle_auto(): + print('[migration] no command given, auto mode') + import os + if os.path.isfile('migration/data/mongo-' + TODAY + '.log'): + url=open('migration/data/mongodb.url', 'r').read() + if not url: + url=prompt('provide mongo url:') + open('migration/data/mongodb.url', 'w').write(url) + mongo_download(url) + bson_handle() + all_handle(data_load(), sys.argv) + create_pgdump() + +def migrate(): import sys + if len(sys.argv) > 1: - cmd = sys.argv[1] + cmd=sys.argv[1] print('[migration] command: ' + cmd) - if cmd == 'bson': + if cmd == 'mongodb': + mongo_download(sys.argv[2]) + elif cmd == 'bson': bson_handle() else: - storage = data_load() + storage=data_load() if cmd == '-': export_one(sys.argv[2], storage) - else: all_handle(storage) - + else: all_handle(storage, sys.argv) + elif len(sys.argv) == 1: + handle_auto() else: - print('usage: python migrate.py bson') - print('.. \t- ') - print('.. \tall') + print('[migration] usage: python ./migration ') + print('[migration] commands: mongodb, bson, all, all mdx, - ') + +if __name__ == '__main__': + migrate() diff --git a/migration/export.py b/migration/export.py index a05d9b5f..1b5ef994 100644 --- a/migration/export.py +++ b/migration/export.py @@ -4,7 +4,7 @@ import json import os import frontmatter from migration.extract import extract_html, prepare_body -from migration.tables.users import migrate_email_subscription +# from migration.tables.users import migrate_email_subscription from migration.utils import DateTimeEncoder OLD_DATE = '2016-03-05 22:22:00.350000' @@ -63,17 +63,18 @@ def export_slug(slug, storage): def export_email_subscriptions(): email_subscriptions_data = json.loads(open('migration/data/email_subscriptions.json').read()) for data in email_subscriptions_data: - migrate_email_subscription(data) + # migrate_email_subscription(data) + pass print('[migration] ' + str(len(email_subscriptions_data)) + ' email subscriptions exported') def export_shouts(storage): # update what was just migrated or load json again if len(storage['users']['by_slugs'].keys()) == 0: storage['users']['by_slugs'] = json.loads(open(EXPORT_DEST + 'authors.json').read()) - print('[migration] ' + str(len(storage['users']['by_slugs'].keys())) + ' exported authors loaded') + print('[migration] ' + str(len(storage['users']['by_slugs'].keys())) + ' exported authors ') if len(storage['shouts']['by_slugs'].keys()) == 0: storage['shouts']['by_slugs'] = json.loads(open(EXPORT_DEST + 'articles.json').read()) - print('[migration] ' + str(len(storage['shouts']['by_slugs'].keys())) + ' exported articles loaded') + print('[migration] ' + str(len(storage['shouts']['by_slugs'].keys())) + ' exported articles ') for slug in storage['shouts']['by_slugs'].keys(): export_slug(slug, storage) def export_json(export_articles = {}, export_authors = {}, export_topics = {}, export_comments = {}): diff --git a/migration/tables/comments.py b/migration/tables/comments.py index 009023c8..d1147d7a 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -1,8 +1,10 @@ from datetime import datetime from dateutil.parser import parse as date_parse -from orm import Comment, CommentRating, User +from orm import Reaction, User +from orm import reaction from orm.base import local_session from migration.html2text import html2text +from orm.reaction import ReactionKind from orm.shout import Shout ts = datetime.now() @@ -27,80 +29,80 @@ def migrate(entry, storage): -> - type Comment { + type Reaction { id: Int! - createdBy: User! - body: String! - replyTo: Comment! - createdAt: DateTime! - updatedAt: DateTime shout: Shout! + createdAt: DateTime! + createdBy: User! + updatedAt: DateTime deletedAt: DateTime deletedBy: User - ratings: [CommentRating] - views: Int - } + range: String # full / 0:2340 + kind: ReactionKind! + body: String + replyTo: Reaction + stat: Stat + old_id: String + old_thread: String + } ''' - if entry.get('deleted'): return - comment_dict = {} + reaction_dict = {} # FIXME: comment_dict['createdAt'] = ts if not entry.get('createdAt') else date_parse(entry.get('createdAt')) # print('[migration] comment original date %r' % entry.get('createdAt')) # print('[migration] comment date %r ' % comment_dict['createdAt']) - comment_dict['body'] = html2text(entry.get('body', '')) - comment_dict['oid'] = entry['_id'] - if entry.get('createdAt'): comment_dict['createdAt'] = date_parse(entry.get('createdAt')) + reaction_dict['body'] = html2text(entry.get('body', '')) + reaction_dict['oid'] = entry['_id'] + if entry.get('createdAt'): reaction_dict['createdAt'] = date_parse(entry.get('createdAt')) shout_oid = entry.get('contentItem') if not shout_oid in storage['shouts']['by_oid']: - print('[migration] no shout for comment', entry) + if len(storage['shouts']['by_oid']) > 0: + return shout_oid + else: + print('[migration] no shouts migrated yet') + raise Exception + return else: with local_session() as session: author = session.query(User).filter(User.oid == entry['createdBy']).first() shout_dict = storage['shouts']['by_oid'][shout_oid] if shout_dict: - comment_dict['shout'] = shout_dict['slug'] - comment_dict['createdBy'] = author.slug if author else 'discours' - # FIXME if entry.get('deleted'): comment_dict['deletedAt'] = date_parse(entry['updatedAt']) or ts - # comment_dict['deletedBy'] = session.query(User).filter(User.oid == (entry.get('updatedBy') or dd['oid'])).first() - # FIXME if entry.get('updatedAt'): comment_dict['updatedAt'] = date_parse(entry['updatedAt']) or ts - #for [k, v] in comment_dict.items(): - # if not v: del comment_dict[f] - # if k.endswith('At'): - # try: comment_dict[k] = datetime(comment_dict[k]) - # except: print(k) - # # print('[migration] comment keys:', f) + reaction_dict['shout'] = shout_dict['slug'] + reaction_dict['createdBy'] = author.slug if author else 'discours' + reaction_dict['kind'] = ReactionKind.COMMENT - comment = Comment.create(**comment_dict) + # creating reaction from old comment + reaction = Reaction.create(**reaction_dict) - comment_dict['id'] = comment.id - comment_dict['ratings'] = [] - comment_dict['oid'] = entry['_id'] - # print(comment) + reaction_dict['id'] = reaction.id for comment_rating_old in entry.get('ratings',[]): rater = session.query(User).filter(User.oid == comment_rating_old['createdBy']).first() - if rater and comment: - comment_rating_dict = { - 'value': comment_rating_old['value'], - 'createdBy': rater.slug, - 'comment_id': comment.id - } - cts = comment_rating_old.get('createdAt') - if cts: comment_rating_dict['createdAt'] = date_parse(cts) - try: - CommentRating.create(**comment_rating_dict) - comment_dict['ratings'].append(comment_rating_dict) - except Exception as e: - print('[migration] comment rating error: %r' % comment_rating_dict) - raise e + reactedBy = rater if rater else session.query(User).filter(User.slug == 'noname').first() + re_reaction_dict = { + 'shout': reaction_dict['shout'], + 'replyTo': reaction.id, + 'kind': ReactionKind.LIKE if comment_rating_old['value'] > 0 else ReactionKind.DISLIKE, + 'createdBy': reactedBy.slug if reactedBy else 'discours' + } + cts = comment_rating_old.get('createdAt') + if cts: re_reaction_dict['createdAt'] = date_parse(cts) + try: + # creating reaction from old rating + Reaction.create(**re_reaction_dict) + except Exception as e: + print('[migration] comment rating error: %r' % re_reaction_dict) + raise e else: - print('[migration] error: cannot find shout for comment %r' % comment_dict) - return comment_dict + print('[migration] error: cannot find shout for comment %r' % reaction_dict) + return reaction -def migrate_2stage(cmt, old_new_id): - reply_oid = cmt.get('replyTo') +def migrate_2stage(rr, old_new_id): + reply_oid = rr.get('replyTo') if not reply_oid: return - new_id = old_new_id.get(cmt['_id']) + new_id = old_new_id.get(rr.get('oid')) if not new_id: return with local_session() as session: - comment = session.query(Comment).filter(Comment.id == new_id).first() + comment = session.query(Reaction).filter(Reaction.id == new_id).first() comment.replyTo = old_new_id.get(reply_oid) + comment.save() session.commit() + if not rr['body']: raise Exception(rr) diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index f7d567bd..2ff7aa22 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -1,11 +1,13 @@ from dateutil.parser import parse as date_parse import sqlalchemy -from orm import Shout, ShoutTopic, ShoutRating, ShoutViewByDay, User +from orm.shout import Shout, ShoutTopic, User +from storages.viewed import ViewedByDay from transliterate import translit from datetime import datetime from orm.base import local_session from migration.extract import prepare_body from orm.community import Community +from orm.reaction import Reaction, ReactionKind OLD_DATE = '2016-03-05 22:22:00.350000' ts = datetime.now() @@ -33,8 +35,8 @@ def migrate(entry, storage): 'community': Community.default_community.id, 'authors': [], 'topics': set([]), - 'rating': 0, - 'ratings': [], + # 'rating': 0, + # 'ratings': [], 'createdAt': [] } topics_by_oid = storage['topics']['by_oid'] @@ -117,8 +119,8 @@ def migrate(entry, storage): shout_dict = r.copy() user = None del shout_dict['topics'] # FIXME: AttributeError: 'str' object has no attribute '_sa_instance_state' - del shout_dict['rating'] # FIXME: TypeError: 'rating' is an invalid keyword argument for Shout - del shout_dict['ratings'] + #del shout_dict['rating'] # FIXME: TypeError: 'rating' is an invalid keyword argument for Shout + #del shout_dict['ratings'] email = userdata.get('email') slug = userdata.get('slug') with local_session() as session: @@ -188,35 +190,36 @@ def migrate(entry, storage): print('[migration] ignored topic slug: \n%r' % tpc['slug']) # raise Exception - # shout ratings + # content_item ratings to reactions try: - shout_dict['ratings'] = [] - for shout_rating_old in entry.get('ratings',[]): + for content_rating in entry.get('ratings',[]): with local_session() as session: - rater = session.query(User).filter(User.oid == shout_rating_old['createdBy']).first() + rater = session.query(User).filter(User.oid == content_rating['createdBy']).first() + reactedBy = rater if rater else session.query(User).filter(User.slug == 'noname').first() if rater: - shout_rating_dict = { - 'value': shout_rating_old['value'], - 'rater': rater.slug, + reaction_dict = { + 'kind': ReactionKind.LIKE if content_rating['value'] > 0 else ReactionKind.DISLIKE, + 'createdBy': reactedBy.slug, 'shout': shout_dict['slug'] } - cts = shout_rating_old.get('createdAt') - if cts: shout_rating_dict['ts'] = date_parse(cts) - shout_rating = session.query(ShoutRating).\ - filter(ShoutRating.shout == shout_dict['slug']).\ - filter(ShoutRating.rater == rater.slug).first() - if shout_rating: - shout_rating_dict['value'] = int(shout_rating_dict['value'] or 0) + int(shout_rating.value or 0) - shout_rating.update(shout_rating_dict) - else: ShoutRating.create(**shout_rating_dict) - shout_dict['ratings'].append(shout_rating_dict) + cts = content_rating.get('createdAt') + if cts: reaction_dict['createdAt'] = date_parse(cts) + reaction = session.query(Reaction).\ + filter(Reaction.shout == reaction_dict['shout']).\ + filter(Reaction.createdBy == reaction_dict['createdBy']).\ + filter(Reaction.kind == reaction_dict['kind']).first() + if reaction: + reaction_dict['kind'] = ReactionKind.AGREE if content_rating['value'] > 0 else ReactionKind.DISAGREE, + reaction.update(reaction_dict) + else: Reaction.create(**reaction_dict) + # shout_dict['ratings'].append(reaction_dict) except: - print('[migration] shout rating error: \n%r' % shout_rating_old) - # raise Exception + print('[migration] content_item.ratings error: \n%r' % content_rating) + raise Exception # shout views - ShoutViewByDay.create( shout = shout_dict['slug'], value = entry.get('views', 1) ) - del shout_dict['ratings'] + ViewedByDay.create( shout = shout_dict['slug'], value = entry.get('views', 1) ) + # del shout_dict['ratings'] shout_dict['oid'] = entry.get('_id') storage['shouts']['by_oid'][entry['_id']] = shout_dict storage['shouts']['by_slug'][slug] = shout_dict diff --git a/migration/tables/users.py b/migration/tables/users.py index bd5981cd..bbae0cec 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -1,6 +1,5 @@ import sqlalchemy from orm import User, UserRating -from orm.user import EmailSubscription from dateutil.parser import parse from orm.base import local_session @@ -41,7 +40,7 @@ def migrate(entry): # name fn = entry['profile'].get('firstName', '') ln = entry['profile'].get('lastName', '') - name = user_dict['slug'] if user_dict['slug'] else 'anonymous' + name = user_dict['slug'] if user_dict['slug'] else 'noname' name = fn if fn else name name = (name + ' ' + ln) if ln else name name = entry['profile']['path'].lower().replace(' ', '-') if len(name) < 2 else name @@ -76,12 +75,6 @@ def migrate(entry): user_dict['id'] = user.id return user_dict -def migrate_email_subscription(entry): - res = {} - res["email"] = entry["email"] - res["createdAt"] = parse(entry["createdAt"]) - EmailSubscription.create(**res) - def migrate_2stage(entry, id_map): ce = 0 for rating_entry in entry.get('ratings',[]): diff --git a/orm/__init__.py b/orm/__init__.py index 0f37ca12..d196bd10 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -1,18 +1,19 @@ -from orm.rbac import Operation, Resource, Permission, Role, RoleStorage -from orm.community import Community, CommunitySubscription -from orm.user import User, UserRating, UserRole, UserStorage -from orm.topic import Topic, TopicSubscription, TopicStorage +from orm.rbac import Operation, Resource, Permission, Role +from storages.roles import RoleStorage +from orm.community import Community +from orm.user import User, UserRating +from orm.topic import Topic, TopicFollower from orm.notification import Notification -from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay,\ - ShoutRatingStorage, ShoutViewStorage, ShoutCommentsSubscription +from orm.shout import Shout +from orm.reaction import Reaction +from storages.topics import TopicStorage +from storages.users import UserStorage +from storages.viewed import ViewedStorage from orm.base import Base, engine, local_session -from orm.comment import Comment, CommentRating #, CommentRatingStorage -from orm.proposal import Proposal, ProposalRating #, ProposalRatingStorage -__all__ = ["User", "Role", "Community", "Operation", \ - "Permission", "Shout", "Topic", "TopicSubscription", \ - "Notification", "ShoutRating", "Comment", "CommentRating", \ - "UserRating", "Proposal", "ProposalRating"] +__all__ = ["User", "Role", "Operation", "Permission", \ + "Community", "Shout", "Topic", "TopicFollower", \ + "Notification", "Reaction", "UserRating"] Base.metadata.create_all(engine) Operation.init_table() @@ -22,10 +23,7 @@ Community.init_table() Role.init_table() with local_session() as session: - ShoutRatingStorage.init(session) - # CommentRatingStorage.init(session) - # ProposalRatingStorage.init(session) - ShoutViewStorage.init(session) + ViewedStorage.init(session) RoleStorage.init(session) UserStorage.init(session) TopicStorage.init(session) diff --git a/orm/comment.py b/orm/comment.py deleted file mode 100644 index af25dae6..00000000 --- a/orm/comment.py +++ /dev/null @@ -1,30 +0,0 @@ -from typing import List -from datetime import datetime - -from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Boolean -from sqlalchemy.orm import relationship - -from orm.base import Base - -class CommentRating(Base): - __tablename__ = "comment_rating" - - id = None - comment_id = Column(ForeignKey('comment.id'), primary_key = True) - createdBy = Column(ForeignKey('user.slug'), primary_key = True) - createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Timestamp") - value = Column(Integer) - -class Comment(Base): - __tablename__ = 'comment' - body: str = Column(String, nullable=False, comment="Comment Body") - createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") - createdBy: str = Column(ForeignKey("user.slug"), nullable=False, comment="Sender") - updatedAt = Column(DateTime, nullable=True, comment="Updated at") - updatedBy = Column(ForeignKey("user.slug"), nullable=True, comment="Last Editor") - deletedAt = Column(DateTime, nullable=True, comment="Deleted at") - deletedBy = Column(ForeignKey("user.slug"), nullable=True, comment="Deleted by") - shout = Column(ForeignKey("shout.slug"), nullable=False) - replyTo: int = Column(ForeignKey("comment.id"), nullable=True, comment="comment ID") - ratings = relationship(CommentRating, foreign_keys=CommentRating.comment_id) - oid: str = Column(String, nullable=True) \ No newline at end of file diff --git a/orm/community.py b/orm/community.py index 7ecb917f..5e9624d5 100644 --- a/orm/community.py +++ b/orm/community.py @@ -1,14 +1,12 @@ from datetime import datetime -from enum import unique -from sqlalchemy import Column, Integer, String, ForeignKey, DateTime -from sqlalchemy.orm import relationship, backref +from sqlalchemy import Column, String, ForeignKey, DateTime from orm.base import Base, local_session -class CommunitySubscription(Base): - __tablename__ = 'community_subscription' +class CommunityFollower(Base): + __tablename__ = 'community_followers' id = None - subscriber = Column(ForeignKey('user.slug'), primary_key = True) + follower = Column(ForeignKey('user.slug'), primary_key = True) community = Column(ForeignKey('community.slug'), primary_key = True) createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") @@ -21,7 +19,7 @@ class Community(Base): desc: str = Column(String, nullable=False, default='') pic: str = Column(String, nullable=False, default='') createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") - createdBy: str = Column(ForeignKey("user.slug"), nullable=False, comment="Creator") + createdBy: str = Column(ForeignKey("user.slug"), nullable=False, comment="Author") @staticmethod def init_table(): diff --git a/orm/notification.py b/orm/notification.py index 6780ab12..a9f274a4 100644 --- a/orm/notification.py +++ b/orm/notification.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Integer, String, ForeignKey, JSON as JSONType +from sqlalchemy import Column, String, JSON as JSONType from orm.base import Base class Notification(Base): @@ -6,4 +6,6 @@ class Notification(Base): kind: str = Column(String, unique = True, primary_key = True) template: str = Column(String, nullable = False) - variables: JSONType = Column(JSONType, nullable = True) # [ , .. ] \ No newline at end of file + variables: JSONType = Column(JSONType, nullable = True) # [ , .. ] + + # FIXME looks like frontend code \ No newline at end of file diff --git a/orm/proposal.py b/orm/proposal.py deleted file mode 100644 index aef6fa1c..00000000 --- a/orm/proposal.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import List -from datetime import datetime -from sqlalchemy import Column, Integer, String, ForeignKey, DateTime -from sqlalchemy.orm import relationship -from orm import Permission -from orm.base import Base - - -class ProposalRating(Base): - __tablename__ = "proposal_rating" - - id = None - proposal_id = Column(ForeignKey('proposal.id'), primary_key = True) - createdBy = Column(ForeignKey('user.slug'), primary_key = True) - createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Timestamp") - value = Column(Integer) - -class Proposal(Base): - __tablename__ = 'proposal' - - shout: str = Column(String, ForeignKey("shout.slug"), nullable=False, comment="Shout") - range: str = Column(String, nullable=True, comment="Range in format :") - body: str = Column(String, nullable=False, comment="Body") - createdBy: int = Column(Integer, ForeignKey("user.id"), nullable=False, comment="Author") - createdAt: str = Column(DateTime, nullable=False, comment="Created at") - updatedAt: str = Column(DateTime, nullable=True, comment="Updated at") - acceptedAt: str = Column(DateTime, nullable=True, comment="Accepted at") - acceptedBy: str = Column(Integer, ForeignKey("user.id"), nullable=True, comment="Accepted by") - declinedAt: str = Column(DateTime, nullable=True, comment="Declined at") - declinedBy: str = Column(Integer, ForeignKey("user.id"), nullable=True, comment="Declined by") - ratings = relationship(ProposalRating, foreign_keys=ProposalRating.proposal_id) - deletedAt: str = Column(DateTime, nullable=True, comment="Deleted at") - # TODO: debug, logix \ No newline at end of file diff --git a/orm/rbac.py b/orm/rbac.py index b72bfe37..4274e14d 100644 --- a/orm/rbac.py +++ b/orm/rbac.py @@ -1,11 +1,6 @@ import warnings - -from typing import Type -import asyncio - -from sqlalchemy import String, Integer, Column, ForeignKey, UniqueConstraint, TypeDecorator -from sqlalchemy.orm import relationship, selectinload - +from sqlalchemy import String, Column, ForeignKey, UniqueConstraint, TypeDecorator +from sqlalchemy.orm import relationship from orm.base import Base, REGISTRY, engine, local_session from orm.community import Community @@ -88,34 +83,6 @@ class Permission(Base): operation_id: int = Column(ForeignKey("operation.id", ondelete="CASCADE"), nullable=False, comment="Operation") resource_id: int = Column(ForeignKey("resource.id", ondelete="CASCADE"), nullable=False, comment="Resource") -class RoleStorage: - roles = {} - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = RoleStorage - roles = session.query(Role).\ - options(selectinload(Role.permissions)).all() - self.roles = dict([(role.id, role) for role in roles]) - - @staticmethod - async def get_role(id): - self = RoleStorage - async with self.lock: - return self.roles.get(id) - - @staticmethod - async def add_role(role): - self = RoleStorage - async with self.lock: - self.roles[id] = role - - @staticmethod - async def del_role(id): - self = RoleStorage - async with self.lock: - del self.roles[id] if __name__ == '__main__': Base.metadata.create_all(engine) diff --git a/orm/reaction.py b/orm/reaction.py new file mode 100644 index 00000000..ee00f38d --- /dev/null +++ b/orm/reaction.py @@ -0,0 +1,51 @@ +from datetime import datetime +from sqlalchemy import Column, String, ForeignKey, DateTime +from orm.base import Base, local_session +import enum +from sqlalchemy import Enum + +from storages.viewed import ViewedStorage + +class ReactionKind(enum.Enum): + AGREE = 1 # +1 + DISAGREE = 2 # -1 + PROOF = 3 # +1 + DISPROOF = 4 # -1 + ASK = 5 # +0 + PROPOSE = 6 # +0 + QOUTE = 7 # +0 + COMMENT = 8 # +0 + ACCEPT = 9 # +1 + REJECT = 0 # -1 + LIKE = 11 # +1 + DISLIKE = 12 # -1 + # TYPE = # rating change guess + + +class Reaction(Base): + __tablename__ = 'reaction' + body: str = Column(String, nullable=True, comment="Reaction Body") + createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") + createdBy: str = Column(ForeignKey("user.slug"), nullable=False, comment="Sender") + updatedAt = Column(DateTime, nullable=True, comment="Updated at") + updatedBy = Column(ForeignKey("user.slug"), nullable=True, comment="Last Editor") + deletedAt = Column(DateTime, nullable=True, comment="Deleted at") + deletedBy = Column(ForeignKey("user.slug"), nullable=True, comment="Deleted by") + shout = Column(ForeignKey("shout.slug"), nullable=False) + replyTo: int = Column(ForeignKey("reaction.id"), nullable=True, comment="Reply to reaction ID") + range: str = Column(String, nullable=True, comment="Range in format :") + kind: int = Column(Enum(ReactionKind), nullable=False, comment="Reaction kind") + oid: str = Column(String, nullable=True, comment="Old ID") + + @property + async def stat(self) -> dict: + reacted = 0 + try: + with local_session() as session: + reacted = session.query(Reaction).filter(Reaction.replyTo == self.id).count() + except Exception as e: + print(e) + return { + "viewed": await ViewedStorage.get_reaction(self.slug), + "reacted": reacted + } \ No newline at end of file diff --git a/orm/shout.py b/orm/shout.py index 22de6c6e..13713432 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -1,24 +1,22 @@ -from typing import List -from datetime import datetime, timedelta -from sqlalchemy import Table, Column, Integer, String, ForeignKey, DateTime, Boolean, func +from datetime import datetime +from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Boolean from sqlalchemy.orm import relationship -from sqlalchemy.orm.attributes import flag_modified -from orm import Permission, User, Topic, TopicSubscription -from orm.comment import Comment -from orm.base import Base, local_session +from orm.user import User +from orm.topic import Topic, ShoutTopic +from orm.reaction import Reaction +from storages.reactions import ReactionsStorage +from storages.viewed import ViewedStorage +from orm.base import Base -from functools import reduce -import asyncio - -class ShoutCommentsSubscription(Base): - __tablename__ = "shout_comments_subscription" +class ShoutReactionsFollower(Base): + __tablename__ = "shout_reactions_followers" id = None - subscriber = Column(ForeignKey('user.slug'), primary_key = True) + follower = Column(ForeignKey('user.slug'), primary_key = True) shout = Column(ForeignKey('shout.slug'), primary_key = True) - createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") auto = Column(Boolean, nullable=False, default = False) + createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") deletedAt: str = Column(DateTime, nullable=True) class ShoutAuthor(Base): @@ -28,300 +26,13 @@ class ShoutAuthor(Base): shout = Column(ForeignKey('shout.slug'), primary_key = True) user = Column(ForeignKey('user.slug'), primary_key = True) -class ShoutViewer(Base): - __tablename__ = "shout_viewer" +class ShoutAllowed(Base): + __tablename__ = "shout_allowed" id = None shout = Column(ForeignKey('shout.slug'), primary_key = True) user = Column(ForeignKey('user.id'), primary_key = True) -class ShoutTopic(Base): - __tablename__ = 'shout_topic' - - id = None - shout = Column(ForeignKey('shout.slug'), primary_key = True) - topic = Column(ForeignKey('topic.slug'), primary_key = True) - -class ShoutRating(Base): - __tablename__ = "shout_rating" - - id = None - rater = Column(ForeignKey('user.slug'), primary_key = True) - shout = Column(ForeignKey('shout.slug'), primary_key = True) - ts = Column(DateTime, nullable=False, default = datetime.now, comment="Timestamp") - value = Column(Integer) - -class ShoutRatingStorage: - - ratings = [] - - lock = asyncio.Lock() - - @staticmethod - def init(session): - ShoutRatingStorage.ratings = session.query(ShoutRating).all() - - @staticmethod - async def get_total_rating(shout_slug): - async with ShoutRatingStorage.lock: - shout_ratings = list(filter(lambda x: x.shout == shout_slug, ShoutRatingStorage.ratings)) - return reduce((lambda x, y: x + y.value), shout_ratings, 0) - - @staticmethod - async def get_ratings(shout_slug): - async with ShoutRatingStorage.lock: - shout_ratings = list(filter(lambda x: x.shout == shout_slug, ShoutRatingStorage.ratings)) - return shout_ratings - - @staticmethod - async def update_rating(new_rating): - async with ShoutRatingStorage.lock: - rating = next((x for x in ShoutRatingStorage.ratings \ - if x.rater == new_rating.rater and x.shout == new_rating.shout), None) - if rating: - rating.value = new_rating.value - rating.ts = new_rating.ts - else: - ShoutRatingStorage.ratings.append(new_rating) - - -class ShoutViewByDay(Base): - __tablename__ = "shout_view_by_day" - - id = None - shout = Column(ForeignKey('shout.slug'), primary_key = True) - day = Column(DateTime, primary_key = True, default = datetime.now) - value = Column(Integer) - -class ShoutViewStorage: - - view_by_shout = {} - this_day_views = {} - to_flush = [] - - period = 30*60 #sec - - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = ShoutViewStorage - views = session.query(ShoutViewByDay).all() - for view in views: - shout = view.shout - value = view.value - old_value = self.view_by_shout.get(shout, 0) - self.view_by_shout[shout] = old_value + value; - if not shout in self.this_day_views: - self.this_day_views[shout] = view - this_day_view = self.this_day_views[shout] - if this_day_view.day < view.day: - self.this_day_views[shout] = view - - @staticmethod - async def get_view(shout_slug): - self = ShoutViewStorage - async with self.lock: - return self.view_by_shout.get(shout_slug, 0) - - @staticmethod - async def inc_view(shout_slug): - self = ShoutViewStorage - async with self.lock: - this_day_view = self.this_day_views.get(shout_slug) - day_start = datetime.now().replace(hour = 0, minute = 0, second = 0) - if not this_day_view or this_day_view.day < day_start: - if this_day_view and getattr(this_day_view, "modified", False): - self.to_flush.append(this_day_view) - this_day_view = ShoutViewByDay.create(shout = shout_slug, value = 1) - self.this_day_views[shout_slug] = this_day_view - else: - this_day_view.value = this_day_view.value + 1 - - this_day_view.modified = True - - old_value = self.view_by_shout.get(shout_slug, 0) - self.view_by_shout[shout_slug] = old_value + 1; - - @staticmethod - async def flush_changes(session): - self = ShoutViewStorage - async with self.lock: - for view in self.this_day_views.values(): - if getattr(view, "modified", False): - session.add(view) - flag_modified(view, "value") - view.modified = False - for view in self.to_flush: - session.add(view) - self.to_flush.clear() - session.commit() - - @staticmethod - async def worker(): - print("[shout.views] worker start") - while True: - try: - print("[shout.views] worker flush changes") - with local_session() as session: - await ShoutViewStorage.flush_changes(session) - except Exception as err: - print("[shout.views] worker error: %s" % (err)) - await asyncio.sleep(ShoutViewStorage.period) - -class TopicStat: - shouts_by_topic = {} - authors_by_topic = {} - subs_by_topic = {} - views_by_topic = {} - lock = asyncio.Lock() - - period = 30*60 #sec - - @staticmethod - async def load_stat(session): - self = TopicStat - - self.shouts_by_topic = {} - self.authors_by_topic = {} - self.subs_by_topic = {} - self.views_by_topic = {} - - shout_topics = session.query(ShoutTopic) - for shout_topic in shout_topics: - topic = shout_topic.topic - shout = shout_topic.shout - if topic in self.shouts_by_topic: - self.shouts_by_topic[topic].append(shout) - else: - self.shouts_by_topic[topic] = [shout] - - authors = await ShoutAuthorStorage.get_authors(shout) - if topic in self.authors_by_topic: - self.authors_by_topic[topic].update(authors) - else: - self.authors_by_topic[topic] = set(authors) - - old_views = self.views_by_topic.get(topic, 0) - self.views_by_topic[topic] = old_views + await ShoutViewStorage.get_view(shout) - - subs = session.query(TopicSubscription) - for sub in subs: - topic = sub.topic - user = sub.subscriber - if topic in self.subs_by_topic: - self.subs_by_topic[topic].append(user) - else: - self.subs_by_topic[topic] = [user] - - @staticmethod - async def get_shouts(topic): - self = TopicStat - async with self.lock: - return self.shouts_by_topic.get(topic, []) - - @staticmethod - async def get_stat(topic): - self = TopicStat - async with self.lock: - shouts = self.shouts_by_topic.get(topic, []) - subs = self.subs_by_topic.get(topic, []) - authors = self.authors_by_topic.get(topic, []) - views = self.views_by_topic.get(topic, 0) - - return { - "shouts" : len(shouts), - "authors" : len(authors), - "subscriptions" : len(subs), - "views" : views - } - - @staticmethod - async def worker(): - self = TopicStat - print("[topic.stats] worker start") - while True: - try: - print("[topic.stats] worker load stat") - with local_session() as session: - async with self.lock: - await self.load_stat(session) - except Exception as err: - print("[topic.stats] worker error: %s" % (err)) - await asyncio.sleep(self.period) - -class ShoutAuthorStorage: - authors_by_shout = {} - lock = asyncio.Lock() - - period = 30*60 #sec - - @staticmethod - async def load(session): - self = ShoutAuthorStorage - authors = session.query(ShoutAuthor) - for author in authors: - user = author.user - shout = author.shout - if shout in self.authors_by_shout: - self.authors_by_shout[shout].append(user) - else: - self.authors_by_shout[shout] = [user] - - @staticmethod - async def get_authors(shout): - self = ShoutAuthorStorage - async with self.lock: - return self.authors_by_shout.get(shout, []) - - @staticmethod - async def worker(): - self = ShoutAuthorStorage - print("[shout.authors] worker start") - while True: - try: - print("[shout.authors] worker load stat") - with local_session() as session: - async with self.lock: - await self.load(session) - except Exception as err: - print("[shout.authors] worker error: %s" % (err)) - await asyncio.sleep(self.period) - -class CommentStat: - stat_by_topic = {} - lock = asyncio.Lock() - - period = 30*60 #sec - - @staticmethod - async def load(session): - self = CommentStat - - stats = session.query(Comment.shout, func.count(Comment.id).label("count")).\ - group_by(Comment.shout) - self.stat_by_topic = dict([(stat.shout, stat.count) for stat in stats]) - - @staticmethod - async def get_stat(shout): - self = CommentStat - async with self.lock: - return self.stat_by_topic.get(shout, 0) - - @staticmethod - async def worker(): - self = CommentStat - print("[comment.stats] worker start") - while True: - try: - print("[comment.stats] worker load stat") - with local_session() as session: - async with self.lock: - await self.load(session) - except Exception as err: - print("[comment.stats] worker error: %s" % (err)) - await asyncio.sleep(self.period) - class Shout(Base): __tablename__ = 'shout' @@ -340,19 +51,18 @@ class Shout(Base): cover: str = Column(String, nullable = True) title: str = Column(String, nullable = True) subtitle: str = Column(String, nullable = True) - comments = relationship(Comment) layout: str = Column(String, nullable = True) - authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__) # NOTE: multiple authors + reactions = relationship(lambda: Reaction) + authors = relationship(lambda: User, secondary=ShoutAuthor.__tablename__) topics = relationship(lambda: Topic, secondary=ShoutTopic.__tablename__) mainTopic = Column(ForeignKey("topic.slug"), nullable=True) - visibleFor = relationship(lambda: User, secondary=ShoutViewer.__tablename__) + visibleFor = relationship(lambda: User, secondary=ShoutAllowed.__tablename__) draft: bool = Column(Boolean, default=True) oid: str = Column(String, nullable=True) @property - async def stat(self): + async def stat(self) -> dict: return { - "views": await ShoutViewStorage.get_view(self.slug), - "comments": await CommentStat.get_stat(self.slug), - "ratings": await ShoutRatingStorage.get_total_rating(self.slug) + "viewed": await ViewedStorage.get_shout(self.slug), + "reacted": await ReactionsStorage.by_shout(self.slug) } diff --git a/orm/topic.py b/orm/topic.py index 3eecaa3c..a2e0e71f 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -1,15 +1,18 @@ from datetime import datetime -from sqlalchemy import Table, Column, Integer, String, ForeignKey, DateTime, JSON as JSONType -from sqlalchemy.orm import relationship +from sqlalchemy import Column, String, ForeignKey, DateTime, JSON as JSONType from orm.base import Base -import asyncio - -class TopicSubscription(Base): - __tablename__ = "topic_subscription" +class ShoutTopic(Base): + __tablename__ = 'shout_topic' id = None - subscriber = Column(ForeignKey('user.slug'), primary_key = True) + shout = Column(ForeignKey('shout.slug'), primary_key = True) + topic = Column(ForeignKey('topic.slug'), primary_key = True) +class TopicFollower(Base): + __tablename__ = "topic_followers" + + id = None + follower = Column(ForeignKey('user.slug'), primary_key = True) topic = Column(ForeignKey('topic.slug'), primary_key = True) createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") @@ -26,47 +29,3 @@ class Topic(Base): community = Column(ForeignKey("community.slug"), nullable=False, comment="Community") oid: str = Column(String, nullable=True, comment="Old ID") -class TopicStorage: - topics = {} - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = TopicStorage - topics = session.query(Topic) - self.topics = dict([(topic.slug, topic) for topic in topics]) - for topic in self.topics.values(): - self.load_parents(topic) - - @staticmethod - def load_parents(topic): - self = TopicStorage - parents = [] - for parent in self.topics.values(): - if topic.slug in parent.children: - parents.append(parent.slug) - topic.parents = parents - return topic - - @staticmethod - async def get_topics(slugs): - self = TopicStorage - async with self.lock: - if not slugs: - return self.topics.values() - topics = filter(lambda topic: topic.slug in slugs, self.topics.values()) - return list(topics) - - @staticmethod - async def get_topics_by_community(community): - self = TopicStorage - async with self.lock: - topics = filter(lambda topic: topic.community == community, self.topics.values()) - return list(topics) - - @staticmethod - async def add_topic(topic): - self = TopicStorage - async with self.lock: - self.topics[topic.slug] = topic - self.load_parents(topic) diff --git a/orm/user.py b/orm/user.py index 93062159..2af07fdd 100644 --- a/orm/user.py +++ b/orm/user.py @@ -1,14 +1,9 @@ -from typing import List from datetime import datetime - -from sqlalchemy import Table, Column, Integer, String, ForeignKey, Boolean, DateTime, JSON as JSONType -from sqlalchemy.orm import relationship, selectinload - +from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, DateTime, JSON as JSONType +from sqlalchemy.orm import relationship from orm.base import Base, local_session -from orm.rbac import Role, RoleStorage -from orm.topic import Topic - -import asyncio +from orm.rbac import Role +from storages.roles import RoleStorage class UserNotifications(Base): __tablename__ = 'user_notifications' @@ -33,21 +28,14 @@ class UserRole(Base): user_id = Column(ForeignKey('user.id'), primary_key = True) role_id = Column(ForeignKey('role.id'), primary_key = True) -class AuthorSubscription(Base): - __tablename__ = "author_subscription" +class AuthorFollower(Base): + __tablename__ = "author_follower" id = None - subscriber = Column(ForeignKey('user.slug'), primary_key = True) + follower = Column(ForeignKey('user.slug'), primary_key = True) author = Column(ForeignKey('user.slug'), primary_key = True) createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") -class EmailSubscription(Base): - __tablename__ = "email_subscription" - - id = None - email = Column(String, primary_key = True) - createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") - class User(Base): __tablename__ = "user" @@ -95,43 +83,6 @@ class User(Base): scope[p.resource_id].add(p.operation_id) return scope -class UserStorage: - users = {} - lock = asyncio.Lock() - - @staticmethod - def init(session): - self = UserStorage - users = session.query(User).\ - options(selectinload(User.roles)).all() - self.users = dict([(user.id, user) for user in users]) - - @staticmethod - async def get_user(id): - self = UserStorage - async with self.lock: - return self.users.get(id) - - @staticmethod - async def get_user_by_slug(slug): - self = UserStorage - async with self.lock: - for user in self.users.values(): - if user.slug == slug: - return user - - @staticmethod - async def add_user(user): - self = UserStorage - async with self.lock: - self.users[user.id] = user - - @staticmethod - async def del_user(id): - self = UserStorage - async with self.lock: - del self.users[id] - if __name__ == "__main__": print(User.get_permission(user_id=1)) diff --git a/pyproject.toml b/pyproject.toml index 7989c572..69e890bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "discoursio-api" -version = "0.1.0" +version = "0.2.0" description = "" authors = ["Discours DevTeam "] license = "MIT" diff --git a/resolvers/__init__.py b/resolvers/__init__.py index 0941c76b..bb16fd52 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -1,14 +1,12 @@ from resolvers.auth import login, sign_out, is_email_used, register, confirm, auth_forget, auth_reset -from resolvers.zine import get_shout_by_slug, subscribe, unsubscribe, view_shout, rate_shout, \ +from resolvers.zine import get_shout_by_slug, follow, unfollow, view_shout, \ top_month, top_overall, recent_published, recent_all, top_viewed, \ shouts_by_authors, shouts_by_topics, shouts_by_communities -from resolvers.profile import get_users_by_slugs, get_current_user, shouts_reviewed -from resolvers.topics import topic_subscribe, topic_unsubscribe, topics_by_author, \ - topics_by_community, topics_by_slugs -from resolvers.comments import create_comment, delete_comment, update_comment, rate_comment -from resolvers.collab import get_shout_proposals, create_proposal, delete_proposal, \ - update_proposal, rate_proposal, decline_proposal, disable_proposal, accept_proposal, \ - invite_author, remove_author +from resolvers.profile import get_users_by_slugs, get_current_user, get_user_reacted_shouts, get_user_roles +from resolvers.topics import topic_follow, topic_unfollow, topics_by_author, topics_by_community, topics_by_slugs +# from resolvers.feed import shouts_for_feed, my_candidates +from resolvers.reactions import create_reaction, delete_reaction, update_reaction, get_all_reactions +from resolvers.collab import invite_author, remove_author from resolvers.editor import create_shout, delete_shout, update_shout from resolvers.community import create_community, delete_community, get_community, get_communities @@ -20,36 +18,43 @@ __all__ = [ "confirm", "auth_forget", "auth_reset" + "sign_out", # profile "get_current_user", "get_users_by_slugs", # zine + "shouts_for_feed", + "my_candidates", "recent_published", + "recent_reacted", "recent_all", "shouts_by_topics", "shouts_by_authors", "shouts_by_communities", - "shouts_reviewed", + "get_user_reacted_shouts", "top_month", "top_overall", "top_viewed", - "rate_shout", "view_shout", + "view_reaction", "get_shout_by_slug", # editor "create_shout", "update_shout", "delete_shout", + # collab + "invite_author", + "remove_author" # topics "topics_by_slugs", "topics_by_community", "topics_by_author", - "topic_subscribe", - "topic_unsubscribe", + "topic_follow", + "topic_unfollow", # communities "get_community", @@ -57,22 +62,12 @@ __all__ = [ "create_community", "delete_community", - # comments - "get_shout_comments", - "comments_subscribe", - "comments_unsubscribe", - "create_comment", - "update_comment", - "delete_comment", - - # collab - "get_shout_proposals", - "create_proposal", - "update_proposal", - "disable_proposal", - "accept_proposal", - "decline_proposal", - "delete_proposal", - "invite_author", - "remove_author" + # reactions + "get_shout_reactions", + "reactions_follow", + "reactions_unfollow", + "create_reaction", + "update_reaction", + "delete_reaction", + "get_all_reactions", ] diff --git a/resolvers/auth.py b/resolvers/auth.py index c8cd86ae..fca657fc 100644 --- a/resolvers/auth.py +++ b/resolvers/auth.py @@ -1,7 +1,6 @@ from graphql import GraphQLResolveInfo from transliterate import translit from urllib.parse import quote_plus - from auth.authenticate import login_required, ResetPassword from auth.authorize import Authorize from auth.identity import Identity @@ -12,7 +11,6 @@ from orm.base import local_session from resolvers.base import mutation, query from resolvers.profile import get_user_info from exceptions import InvalidPassword, InvalidToken - from settings import JWT_AUTH_HEADER @mutation.field("confirmEmail") diff --git a/resolvers/collab.py b/resolvers/collab.py index 0c881779..f8f05beb 100644 --- a/resolvers/collab.py +++ b/resolvers/collab.py @@ -1,221 +1,9 @@ -import asyncio -from orm import Proposal, ProposalRating, UserStorage +from datetime import datetime from orm.base import local_session from orm.shout import Shout -from sqlalchemy.orm import selectinload from orm.user import User -from resolvers.base import mutation, query +from resolvers.base import mutation from auth.authenticate import login_required -from datetime import datetime - - -class ProposalResult: - def __init__(self, status, proposal): - self.status = status - self.proposal = proposal - -class ProposalStorage: - lock = asyncio.Lock() - subscriptions = [] - - @staticmethod - async def register_subscription(subs): - async with ProposalStorage.lock: - ProposalStorage.subscriptions.append(subs) - - @staticmethod - async def del_subscription(subs): - async with ProposalStorage.lock: - ProposalStorage.subscriptions.remove(subs) - - @staticmethod - async def put(message_result): - async with ProposalStorage.lock: - for subs in ProposalStorage.subscriptions: - if message_result.message["chatId"] == subs.chat_id: - subs.queue.put_nowait(message_result) - - - -@query.field("getShoutProposals") -@login_required -async def get_shout_proposals(_, info, slug): - auth = info.context["request"].auth - user_id = auth.user_id - with local_session() as session: - proposals = session.query(Proposal).\ - options(selectinload(Proposal.ratings)).\ - filter(Proposal.shout == slug).\ - group_by(Proposal.id).all() - shout = session.query(Shout).filter(Shout.slug == slug).first() - authors = [author.id for author in shout.authors] - if user_id not in authors: - return {"error": "access denied"} - for proposal in proposals: - proposal.createdBy = await UserStorage.get_user(proposal.createdBy) - return proposals - - -@mutation.field("createProposal") -@login_required -async def create_proposal(_, info, body, shout, range = None): - auth = info.context["request"].auth - user_id = auth.user_id - - proposal = Proposal.create( - createdBy = user_id, - body = body, - shout = shout, - range = range - ) - - result = ProposalResult("NEW", proposal) - await ProposalStorage.put(result) - - return {"proposal": proposal} - -@mutation.field("updateProposal") -@login_required -async def update_proposal(_, info, id, body): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - shout = session.query(Shout).filter(Shout.sllug == proposal.shout).first() - authors = [author.id for author in shout.authors] - if not proposal: - return {"error": "invalid proposal id"} - if proposal.author in authors: - return {"error": "access denied"} - proposal.body = body - proposal.updatedAt = datetime.now() - session.commit() - - result = ProposalResult("UPDATED", proposal) - await ProposalStorage.put(result) - - return {"proposal": proposal} - -@mutation.field("deleteProposal") -@login_required -async def delete_proposal(_, info, id): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - if not proposal: - return {"error": "invalid proposal id"} - if proposal.createdBy != user_id: - return {"error": "access denied"} - - proposal.deletedAt = datetime.now() - session.commit() - - result = ProposalResult("DELETED", proposal) - await ProposalStorage.put(result) - - return {} - -@mutation.field("disableProposal") -@login_required -async def disable_proposal(_, info, id): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - if not proposal: - return {"error": "invalid proposal id"} - if proposal.createdBy != user_id: - return {"error": "access denied"} - - proposal.deletedAt = datetime.now() - session.commit() - - result = ProposalResult("DISABLED", proposal) - await ProposalStorage.put(result) - - return {} - -@mutation.field("rateProposal") -@login_required -async def rate_proposal(_, info, id, value): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - if not proposal: - return {"error": "invalid proposal id"} - - rating = session.query(ProposalRating).\ - filter(ProposalRating.proposal_id == id and ProposalRating.createdBy == user_id).first() - if rating: - rating.value = value - session.commit() - - if not rating: - ProposalRating.create( - proposal_id = id, - createdBy = user_id, - value = value) - - result = ProposalResult("UPDATED_RATING", proposal) - await ProposalStorage.put(result) - - return {} - - -@mutation.field("acceptProposal") -@login_required -async def accept_proposal(_, info, id): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - shout = session.query(Shout).filter(Shout.slug == proposal.shout).first() - authors = [author.id for author in shout.authors] - if not proposal: - return {"error": "invalid proposal id"} - if user_id not in authors: - return {"error": "access denied"} - - proposal.acceptedAt = datetime.now() - proposal.acceptedBy = user_id - session.commit() - - result = ProposalResult("ACCEPTED", proposal) - await ProposalStorage.put(result) - - return {} - -@mutation.field("declineProposal") -@login_required -async def decline_proposal(_, info, id): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - proposal = session.query(Proposal).filter(Proposal.id == id).first() - shout = session.query(Shout).filter(Shout.slug == proposal.shout).first() - authors = [author.id for author in shout.authors] - if not proposal: - return {"error": "invalid proposal id"} - if user_id not in authors: - return {"error": "access denied"} - - proposal.acceptedAt = datetime.now() - proposal.acceptedBy = user_id - session.commit() - - result = ProposalResult("DECLINED", proposal) - await ProposalStorage.put(result) - - return {} - @mutation.field("inviteAuthor") @login_required @@ -234,11 +22,10 @@ async def invite_author(_, info, author, shout): if author.id in authors: return {"error": "already added"} shout.authors.append(author) + shout.updated_at = datetime.now() + shout.save() session.commit() - # result = Result("INVITED") - # FIXME: await ShoutStorage.put(result) - # TODO: email notify return {} @@ -260,6 +47,8 @@ async def remove_author(_, info, author, shout): if author.id not in authors: return {"error": "not in authors"} shout.authors.remove(author) + shout.updated_at = datetime.now() + shout.save() session.commit() # result = Result("INVITED") diff --git a/resolvers/comments.py b/resolvers/comments.py deleted file mode 100644 index 9663590e..00000000 --- a/resolvers/comments.py +++ /dev/null @@ -1,136 +0,0 @@ -from orm import Comment, CommentRating -from orm.base import local_session -from orm.shout import ShoutCommentsSubscription -from orm.user import User -from resolvers.base import mutation, query -from auth.authenticate import login_required -from datetime import datetime - -def comments_subscribe(user, slug, auto = False): - with local_session() as session: - sub = session.query(ShoutCommentsSubscription).\ - filter(ShoutCommentsSubscription.subscriber == user.slug, ShoutCommentsSubscription.shout == slug).\ - first() - if auto and sub: - return - elif not auto and sub: - if not sub.deletedAt is None: - sub.deletedAt = None - sub.auto = False - session.commit() - return - raise Exception("subscription already exist") - - ShoutCommentsSubscription.create( - subscriber = user.slug, - shout = slug, - auto = auto) - -def comments_unsubscribe(user, slug): - with local_session() as session: - sub = session.query(ShoutCommentsSubscription).\ - filter(ShoutCommentsSubscription.subscriber == user.slug, ShoutCommentsSubscription.shout == slug).\ - first() - if not sub: - raise Exception("subscription not exist") - if sub.auto: - sub.deletedAt = datetime.now() - else: - session.delete(sub) - session.commit() - -@mutation.field("createComment") -@login_required -async def create_comment(_, info, body, shout, replyTo = None): - user = info.context["request"].user - - comment = Comment.create( - createdBy = user.slug, - body = body, - shout = shout, - replyTo = replyTo - ) - - try: - comments_subscribe(user, shout, True) - except Exception as e: - print(f"error on comment autosubscribe: {e}") - - return {"comment": comment} - -@mutation.field("updateComment") -@login_required -async def update_comment(_, info, id, body): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - comment = session.query(Comment).filter(Comment.id == id).first() - if not comment: - return {"error": "invalid comment id"} - if comment.createdBy != user_id: - return {"error": "access denied"} - - comment.body = body - comment.updatedAt = datetime.now() - - session.commit() - - return {"comment": comment} - -@mutation.field("deleteComment") -@login_required -async def delete_comment(_, info, id): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - comment = session.query(Comment).filter(Comment.id == id).first() - if not comment: - return {"error": "invalid comment id"} - if comment.createdBy != user_id: - return {"error": "access denied"} - - comment.deletedAt = datetime.now() - session.commit() - - return {} - -@mutation.field("rateComment") -@login_required -async def rate_comment(_, info, id, value): - auth = info.context["request"].auth - user_id = auth.user_id - - with local_session() as session: - user = session.query(User).filter(User.id == user_id).first() - comment = session.query(Comment).filter(Comment.id == id).first() - if not comment: - return {"error": "invalid comment id"} - rating = session.query(CommentRating).\ - filter(CommentRating.comment_id == id, CommentRating.createdBy == user.slug).first() - if rating: - rating.value = value - session.commit() - - if not rating: - CommentRating.create( - comment_id = id, - createdBy = user_id, - value = value) - - return {} - -def get_subscribed_shout_comments(slug): - with local_session() as session: - rows = session.query(ShoutCommentsSubscription.shout).\ - filter(ShoutCommentsSubscription.subscriber == slug,\ - ShoutCommentsSubscription.deletedAt == None).\ - all() - slugs = [row.shout for row in rows] - return slugs - -@query.field("commentsAll") -def get_top10_comments(_, info, page = 1, size = 10): - with local_session() as session: - rows = session.query(Comment).limit(size).all() diff --git a/resolvers/community.py b/resolvers/community.py index 10dffbcd..60d23677 100644 --- a/resolvers/community.py +++ b/resolvers/community.py @@ -1,5 +1,6 @@ -from orm import Community, CommunitySubscription +from orm.community import Community, CommunityFollower from orm.base import local_session +from orm.user import User from resolvers.base import mutation, query from auth.authenticate import login_required from datetime import datetime @@ -26,12 +27,15 @@ async def create_community(_, info, input): async def update_community(_, info, input): auth = info.context["request"].auth user_id = auth.user_id + community_slug = input.get('slug', '') with local_session() as session: - community = session.query(Community).filter(Community.slug == input.get('slug', '')).first() + owner = session.query(User).filter(User.id == user_id) # note list here + community = session.query(Community).filter(Community.slug == community_slug).first() + editors = [e.slug for e in community.editors] if not community: return {"error": "invalid community id"} - if community.createdBy != user_id: + if community.createdBy not in (owner + editors): return {"error": "access denied"} community.title = input.get('title', '') community.desc = input.get('desc', '') @@ -71,27 +75,28 @@ async def get_communities(_, info): communities = session.query(Community) return communities -def community_subscribe(user, slug): - CommunitySubscription.create( - subscriber = user.slug, +def community_follow(user, slug): + CommunityFollower.create( + follower = user.slug, community = slug ) -def community_unsubscribe(user, slug): +def community_unfollow(user, slug): with local_session() as session: - sub = session.query(CommunitySubscription).\ - filter(and_(CommunitySubscription.subscriber == user.slug, CommunitySubscription.community == slug)).\ + following = session.query(CommunityFollower).\ + filter(and_(CommunityFollower.follower == user.slug, CommunityFollower.community == slug)).\ first() - if not sub: - raise Exception("subscription not exist") - session.delete(sub) + if not following: + raise Exception("[orm.community] following was not exist") + session.delete(following) session.commit() -def get_subscribed_communities(user_slug): +@query.field("userFollowedCommunities") +def get_followed_communities(_, user_slug) -> list[Community]: + ccc = [] with local_session() as session: - rows = session.query(Community.slug).\ - join(CommunitySubscription).\ - where(CommunitySubscription.subscriber == user_slug).\ + ccc = session.query(Community.slug).\ + join(CommunityFollower).\ + where(CommunityFollower.follower == user_slug).\ all() - slugs = [row.slug for row in rows] - return slugs + return ccc diff --git a/resolvers/editor.py b/resolvers/editor.py index 49e2dccb..443db522 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -4,11 +4,10 @@ from orm.rbac import Resource from orm.shout import ShoutAuthor, ShoutTopic from orm.user import User from resolvers.base import mutation -from resolvers.comments import comments_subscribe +from resolvers.reactions import reactions_follow, reactions_unfollow from auth.authenticate import login_required from datetime import datetime - -from resolvers.zine import GitTask +from storages.gittask import GitTask @mutation.field("createShout") @@ -26,7 +25,7 @@ async def create_shout(_, info, input): user = user.slug ) - comments_subscribe(user, new_shout.slug, True) + reactions_follow(user, new_shout.slug, True) if "mainTopic" in input: topic_slugs.append(input["mainTopic"]) @@ -110,8 +109,10 @@ async def delete_shout(_, info, slug): return {"error": "invalid shout slug"} if user_id not in authors: return {"error": "access denied"} - + for a in authors: + reactions_unfollow(a.slug, slug, True) shout.deletedAt = datetime.now() session.commit() + return {} diff --git a/resolvers/feed.py b/resolvers/feed.py new file mode 100644 index 00000000..be07f1fb --- /dev/null +++ b/resolvers/feed.py @@ -0,0 +1,41 @@ +from auth.authenticate import login_required +from orm.base import local_session +from sqlalchemy import and_, desc, query +from orm.reaction import Reaction +from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.topic import TopicFollower +from orm.user import AuthorFollower + +@query.field("shoutsForFeed") +@login_required +def get_user_feed(_, info, page, size) -> list[Shout]: + user = info.context["request"].user + shouts = [] + with local_session() as session: + shouts = session.query(Shout).\ + join(ShoutAuthor).\ + join(AuthorFollower).\ + where(AuthorFollower.follower == user.slug).\ + order_by(desc(Shout.createdAt)) + topicrows = session.query(Shout).\ + join(ShoutTopic).\ + join(TopicFollower).\ + where(TopicFollower.follower == user.slug).\ + order_by(desc(Shout.createdAt)) + shouts = shouts.union(topicrows).limit(size).offset(page * size).all() + return shouts + +@query.field("myCandidates") +@login_required +async def user_unpublished_shouts(_, info, page = 1, size = 10) -> list[Shout]: + user = info.context["request"].user + shouts = [] + with local_session() as session: + shouts = session.query(Shout).\ + join(ShoutAuthor).\ + where(and_(Shout.publishedAt == None, ShoutAuthor.user == user.slug)).\ + order_by(desc(Shout.createdAt)).\ + limit(size).\ + offset( page * size).\ + all() + return shouts diff --git a/resolvers/profile.py b/resolvers/profile.py index 1a8cb59a..5dcf562f 100644 --- a/resolvers/profile.py +++ b/resolvers/profile.py @@ -1,224 +1,154 @@ -from orm import User, UserRole, Role, UserRating -from orm.user import AuthorSubscription, UserStorage -from orm.comment import Comment +from orm.user import User, UserRole, Role, UserRating, AuthorFollower +from storages.users import UserStorage +from orm.shout import Shout +from orm.reaction import Reaction from orm.base import local_session -from orm.topic import Topic, TopicSubscription -from resolvers.base import mutation, query, subscription -from resolvers.community import get_subscribed_communities -from resolvers.comments import get_subscribed_shout_comments +from orm.topic import Topic, TopicFollower +from resolvers.base import mutation, query +from resolvers.community import get_followed_communities +from resolvers.reactions import get_shout_reactions from auth.authenticate import login_required - -from inbox_resolvers.inbox import get_total_unread_messages_for_user - -from sqlalchemy import func, and_, desc +from inbox_resolvers.inbox import get_inbox_counter +from sqlalchemy import and_, desc from sqlalchemy.orm import selectinload -import asyncio -def _get_user_subscribed_topic_slugs(slug): - with local_session() as session: - rows = session.query(Topic.slug).\ - join(TopicSubscription).\ - where(TopicSubscription.subscriber == slug).\ - all() - slugs = [row.slug for row in rows] - return slugs -def _get_user_subscribed_authors(slug): - with local_session() as session: - authors = session.query(User.slug).\ - join(AuthorSubscription, User.slug == AuthorSubscription.author).\ - where(AuthorSubscription.subscriber == slug) - return authors +@query.field("userReactedShouts") +async def get_user_reacted_shouts(_, info, slug, page, size) -> list[Shout]: + user = await UserStorage.get_user_by_slug(slug) + if not user: return {} + with local_session() as session: + shouts = session.query(Shout).\ + join(Reaction).\ + where(Reaction.createdBy == user.slug).\ + order_by(desc(Reaction.createdAt)).\ + limit(size).\ + offset(page * size).all() + return shouts + +@query.field("userFollowedTopics") +@login_required +def get_followed_topics(_, slug) -> list[Topic]: + rows = [] + with local_session() as session: + rows = session.query(Topic).\ + join(TopicFollower).\ + where(TopicFollower.follower == slug).\ + all() + return rows + + +@query.field("userFollowedAuthors") +def get_followed_authors(_, slug) -> list[User]: + authors = [] + with local_session() as session: + authors = session.query(User).\ + join(AuthorFollower, User.slug == AuthorFollower.author).\ + where(AuthorFollower.follower == slug).\ + all() + return authors + + +@query.field("userFollowers") +async def user_followers(_, slug) -> list[User]: + with local_session() as session: + users = session.query(User).\ + join(AuthorFollower, User.slug == AuthorFollower.follower).\ + where(AuthorFollower.author == slug).\ + all() + return users + +# for query.field("getCurrentUser") async def get_user_info(slug): - return { - "totalUnreadMessages" : await get_total_unread_messages_for_user(slug), - "userSubscribedTopics" : _get_user_subscribed_topic_slugs(slug), - "userSubscribedAuthors" : _get_user_subscribed_authors(slug), - "userSubscribedCommunities" : get_subscribed_communities(slug), - "userSubscribedShoutComments": get_subscribed_shout_comments(slug) - } + return { + "inbox": await get_inbox_counter(slug), + "topics": [t.slug for t in get_followed_topics(0, slug)], + "authors": [a.slug for a in get_followed_authors(0, slug)], + "reactions": [r.shout for r in get_shout_reactions(0, slug)], + "communities": [c.slug for c in get_followed_communities(0, slug)] + } + @query.field("getCurrentUser") @login_required async def get_current_user(_, info): - user = info.context["request"].user - return { - "user": user, - "info": await get_user_info(user.slug) - } + user = info.context["request"].user + return { + "user": user, + "info": await get_user_info(user.slug) + } + @query.field("getUsersBySlugs") async def get_users_by_slugs(_, info, slugs): - with local_session() as session: - users = session.query(User).\ - options(selectinload(User.ratings)).\ - filter(User.slug.in_(slugs)).all() - return users + with local_session() as session: + users = session.query(User).\ + options(selectinload(User.ratings)).\ + filter(User.slug.in_(slugs)).all() + return users + @query.field("getUserRoles") async def get_user_roles(_, info, slug): + with local_session() as session: + user = session.query(User).where(User.slug == slug).first() + roles = session.query(Role).\ + options(selectinload(Role.permissions)).\ + join(UserRole).\ + where(UserRole.user_id == user.id).all() + return roles - with local_session() as session: - user = session.query(User).where(User.slug == slug).first() - - roles = session.query(Role).\ - options(selectinload(Role.permissions)).\ - join(UserRole).\ - where(UserRole.user_id == user.id).all() - - return roles @mutation.field("updateProfile") @login_required async def update_profile(_, info, profile): - auth = info.context["request"].auth - user_id = auth.user_id + auth = info.context["request"].auth + user_id = auth.user_id + with local_session() as session: + user = session.query(User).filter(User.id == user_id).first() + user.update(profile) + session.commit() + return {} - with local_session() as session: - user = session.query(User).filter(User.id == user_id).first() - user.update(profile) - session.commit() - - return {} - -@query.field("userComments") -async def user_comments(_, info, slug, page, size): - user = await UserStorage.get_user_by_slug(slug) - if not user: - return - - page = page - 1 - with local_session() as session: - comments = session.query(Comment).\ - filter(Comment.createdBy == user.id).\ - order_by(desc(Comment.createdAt)).\ - limit(size).\ - offset(page * size) - - return comments - -@query.field("userSubscribedAuthors") -async def user_subscriptions(_, info, slug): - slugs = _get_user_subscribed_authors(slug) - return slugs - -@query.field("userSubscribers") -async def user_subscribers(_, info, slug): - with local_session() as session: - slugs = session.query(User.slug).\ - join(AuthorSubscription, User.slug == AuthorSubscription.subscriber).\ - where(AuthorSubscription.author == slug) - return slugs - -@query.field("userSubscribedTopics") -async def user_subscribed_topics(_, info, slug): - return _get_user_subscribed_topic_slugs(slug) @mutation.field("rateUser") @login_required async def rate_user(_, info, slug, value): - user = info.context["request"].user + user = info.context["request"].user + with local_session() as session: + rating = session.query(UserRating).\ + filter(and_(UserRating.rater == user.slug, UserRating.user == slug)).\ + first() + if rating: + rating.value = value + session.commit() + return {} + try: + UserRating.create( + rater=user.slug, + user=slug, + value=value + ) + except Exception as err: + return {"error": err} + return {} - with local_session() as session: - rating = session.query(UserRating).\ - filter(and_(UserRating.rater == user.slug, UserRating.user == slug)).\ - first() - - if rating: - rating.value = value - session.commit() - return {} - - UserRating.create( - rater = user.slug, - user = slug, - value = value - ) - - return {} - - -def author_subscribe(user, slug): - AuthorSubscription.create( - subscriber = user.slug, - author = slug - ) - -def author_unsubscribe(user, slug): - with local_session() as session: - sub = session.query(AuthorSubscription).\ - filter(and_(AuthorSubscription.subscriber == user.slug, AuthorSubscription.author == slug)).\ - first() - if not sub: - raise Exception("subscription not exist") - session.delete(sub) - session.commit() - -@query.field("shoutsRatedByUser") -@login_required -async def shouts_rated_by_user(_, info, page, size): - user = info.context["request"].user - - with local_session() as session: - shouts = session.query(Shout).\ - join(ShoutRating).\ - where(ShoutRating.rater == user.slug).\ - order_by(desc(ShoutRating.ts)).\ - limit(size).\ - offset( (page - 1) * size) - - return { - "shouts" : shouts - } - -@query.field("userUnpublishedShouts") -@login_required -async def user_unpublished_shouts(_, info, page, size): - user = info.context["request"].user - - with local_session() as session: - shouts = session.query(Shout).\ - join(ShoutAuthor).\ - where(and_(Shout.publishedAt == None, ShoutAuthor.user == user.slug)).\ - order_by(desc(Shout.createdAt)).\ - limit(size).\ - offset( (page - 1) * size) - - return { - "shouts" : shouts - } - -@query.field("shoutsReviewed") -@login_required -async def shouts_reviewed(_, info, page, size): - user = info.context["request"].user - with local_session() as session: - shouts_by_rating = session.query(Shout).\ - join(ShoutRating).\ - where(and_(Shout.publishedAt != None, ShoutRating.rater == user.slug)) - shouts_by_comment = session.query(Shout).\ - join(Comment).\ - where(and_(Shout.publishedAt != None, Comment.createdBy == user.id)) - shouts = shouts_by_rating.union(shouts_by_comment).\ - order_by(desc(Shout.publishedAt)).\ - limit(size).\ - offset( (page - 1) * size) - - return shouts - -@query.field("shoutsCommentedByUser") -async def shouts_commented_by_user(_, info, slug, page, size): - user = await UserStorage.get_user_by_slug(slug) - if not user: - return {} - - with local_session() as session: - shouts = session.query(Shout).\ - join(Comment).\ - where(Comment.createdBy == user.id).\ - order_by(desc(Comment.createdAt)).\ - limit(size).\ - offset( (page - 1) * size) - return shouts +# for mutation.field("follow") +def author_follow(user, slug): + AuthorFollower.create( + follower=user.slug, + author=slug + ) +# for mutation.field("unfollow") +def author_unfollow(user, slug): + with local_session() as session: + flw = session.query(AuthorFollower).\ + filter(and_(AuthorFollower.follower == user.slug, AuthorFollower.author == slug)).\ + first() + if not flw: + raise Exception("[resolvers.profile] follower not exist, cant unfollow") + else: + session.delete(flw) + session.commit() diff --git a/resolvers/reactions.py b/resolvers/reactions.py new file mode 100644 index 00000000..c63ea46d --- /dev/null +++ b/resolvers/reactions.py @@ -0,0 +1,150 @@ +from sqlalchemy import and_ +from sqlalchemy.orm import selectinload, joinedload +from orm.reaction import Reaction +from orm.base import local_session +from orm.shout import Shout, ShoutReactionsFollower +from orm.user import User +from resolvers.base import mutation, query +from auth.authenticate import login_required +from datetime import datetime +from storages.reactions import ReactionsStorage +from storages.viewed import ViewedStorage + + +def reactions_follow(user, slug, auto=False): + with local_session() as session: + fw = session.query(ShoutReactionsFollower).\ + filter(ShoutReactionsFollower.follower == user.slug, ShoutReactionsFollower.shout == slug).\ + first() + if auto and fw: + return + elif not auto and fw: + if not fw.deletedAt is None: + fw.deletedAt = None + fw.auto = False + session.commit() + return + # print("[resolvers.reactions] was followed before") + + ShoutReactionsFollower.create( + follower=user.slug, + shout=slug, + auto=auto) + + +def reactions_unfollow(user, slug): + with local_session() as session: + following = session.query(ShoutReactionsFollower).\ + filter(ShoutReactionsFollower.follower == user.slug, ShoutReactionsFollower.shout == slug).\ + first() + if not following: + # print("[resolvers.reactions] was not followed", slug) + return + if following.auto: + following.deletedAt = datetime.now() + else: + session.delete(following) + session.commit() + + +@mutation.field("createReaction") +@login_required +async def create_reaction(_, info, inp): + user = info.context["request"].user + + reaction = Reaction.create(**inp) + + try: + reactions_follow(user, inp['shout'], True) + except Exception as e: + print(f"[resolvers.reactions] error on reactions autofollowing: {e}") + + return {"reaction": reaction} + + +@mutation.field("updateReaction") +@login_required +async def update_reaction(_, info, inp): + auth = info.context["request"].auth + user_id = auth.user_id + + with local_session() as session: + user = session.query(User).filter(User.id == user_id).first() + reaction = session.query(Reaction).filter(Reaction.id == id).first() + if not reaction: + return {"error": "invalid reaction id"} + if reaction.createdBy != user.slug: + return {"error": "access denied"} + reaction.body = inp['body'] + reaction.updatedAt = datetime.now() + if reaction.kind != inp['kind']: + # TODO: change mind detection + pass + if inp.get('range'): + reaction.range = inp.get('range') + session.commit() + + return {"reaction": reaction} + + +@mutation.field("deleteReaction") +@login_required +async def delete_reaction(_, info, id): + auth = info.context["request"].auth + user_id = auth.user_id + with local_session() as session: + user = session.query(User).filter(User.id == user_id).first() + reaction = session.query(Reaction).filter(Reaction.id == id).first() + if not reaction: + return {"error": "invalid reaction id"} + if reaction.createdBy != user.slug: + return {"error": "access denied"} + reaction.deletedAt = datetime.now() + session.commit() + return {} + +@query.field("reactionsByShout") +def get_shout_reactions(_, info, slug) -> list[Shout]: + shouts = [] + with local_session() as session: + shoutslugs = session.query(ShoutReactionsFollower.shout).\ + join(User).where(Reaction.createdBy == User.slug).\ + filter(ShoutReactionsFollower.follower == slug, + ShoutReactionsFollower.deletedAt == None).all() + shoutslugs = list(set(shoutslugs)) + shouts = session.query(Shout).filter(Shout.slug in shoutslugs).all() + return shouts + + +@query.field("reactionsAll") +def get_all_reactions(_, info, page=1, size=10) -> list[Reaction]: + reactions = [] + with local_session() as session: + q = session.query(Reaction).\ + options( + joinedload(User), + joinedload(Shout) + ).\ + join( User, Reaction.createdBy == User.slug ).\ + join( Shout, Reaction.shout == Shout.slug ).\ + filter( Reaction.deletedAt == None ).\ + limit(size).offset(page * size).all() + # print(reactions[0].dict()) + return reactions + + +@query.field("reactionsByAuthor") +def get_reactions_by_author(_, info, slug, page=1, size=50) -> list[Reaction]: + reactions = [] + with local_session() as session: + reactions = session.query(Reaction).\ + join(Shout).where(Reaction.shout == Shout.slug).\ + filter(Reaction.deletedAt == None, Reaction.createdBy == slug).\ + limit(size).offset(page * size).all() # pagination + return reactions + + +@mutation.field("viewReaction") +async def view_reaction(_, info, reaction): + await ViewedStorage.inc_reaction(reaction) + return {"error" : ""} \ No newline at end of file diff --git a/resolvers/topics.py b/resolvers/topics.py index 49123b40..b4c59a7c 100644 --- a/resolvers/topics.py +++ b/resolvers/topics.py @@ -1,17 +1,16 @@ -from orm import Topic, TopicSubscription, TopicStorage, Shout, User -from orm.shout import TopicStat, ShoutAuthorStorage -from orm.user import UserStorage +from orm.topic import Topic, TopicFollower +from storages.topics import TopicStorage +from orm.shout import Shout +from orm.user import User +from storages.topicstat import TopicStat from orm.base import local_session from resolvers.base import mutation, query from auth.authenticate import login_required -import asyncio - -from sqlalchemy import func, and_ +from sqlalchemy import and_ @query.field("topicsAll") -async def topics_by_slugs(_, info, slugs = None): - with local_session() as session: - topics = await TopicStorage.get_topics(slugs) +async def topics_by_slugs(_, info, page = 1, size = 50): + topics = await TopicStorage.get_topics_all(page, size) all_fields = [node.name.value for node in info.field_nodes[0].selection_set.selections] if "stat" in all_fields: for topic in topics: @@ -20,8 +19,7 @@ async def topics_by_slugs(_, info, slugs = None): @query.field("topicsByCommunity") async def topics_by_community(_, info, community): - with local_session() as session: - topics = await TopicStorage.get_topics_by_community(community) + topics = await TopicStorage.get_topics_by_community(community) all_fields = [node.name.value for node in info.field_nodes[0].selection_set.selections] if "stat" in all_fields: for topic in topics: @@ -65,17 +63,17 @@ async def update_topic(_, info, input): return { "topic" : topic } -def topic_subscribe(user, slug): - TopicSubscription.create( - subscriber = user.slug, +def topic_follow(user, slug): + TopicFollower.create( + follower = user.slug, topic = slug) -def topic_unsubscribe(user, slug): +def topic_unfollow(user, slug): with local_session() as session: - sub = session.query(TopicSubscription).\ - filter(and_(TopicSubscription.subscriber == user.slug, TopicSubscription.topic == slug)).\ + sub = session.query(TopicFollower).\ + filter(and_(TopicFollower.follower == user.slug, TopicFollower.topic == slug)).\ first() if not sub: - raise Exception("subscription not exist") + raise Exception("[resolvers.topics] follower not exist") session.delete(sub) session.commit() diff --git a/resolvers/zine.py b/resolvers/zine.py index b74a4139..4f1f6024 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,216 +1,18 @@ -from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, \ - User, Community, Resource, ShoutRatingStorage, ShoutViewStorage, \ - Comment, CommentRating, Topic, ShoutCommentsSubscription -from orm.community import CommunitySubscription +from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.topic import Topic from orm.base import local_session -from orm.user import UserStorage, AuthorSubscription -from orm.topic import TopicSubscription - from resolvers.base import mutation, query -from resolvers.profile import author_subscribe, author_unsubscribe -from resolvers.topics import topic_subscribe, topic_unsubscribe -from resolvers.community import community_subscribe, community_unsubscribe -from resolvers.comments import comments_subscribe, comments_unsubscribe +from storages.shoutscache import ShoutsCache +from storages.viewed import ViewedStorage +from resolvers.profile import author_follow, author_unfollow +from resolvers.topics import topic_follow, topic_unfollow +from resolvers.community import community_follow, community_unfollow +from resolvers.reactions import reactions_follow, reactions_unfollow from auth.authenticate import login_required -from settings import SHOUTS_REPO - -import subprocess -import asyncio -from datetime import datetime, timedelta - -from pathlib import Path -from sqlalchemy import select, func, desc, and_ +from sqlalchemy import select, desc, and_ from sqlalchemy.orm import selectinload -class GitTask: - - queue = asyncio.Queue() - - def __init__(self, input, username, user_email, comment): - self.slug = input["slug"] - self.shout_body = input["body"] - self.username = username - self.user_email = user_email - self.comment = comment - - GitTask.queue.put_nowait(self) - - def init_repo(self): - repo_path = "%s" % (SHOUTS_REPO) - - Path(repo_path).mkdir() - - cmd = "cd %s && git init && " \ - "git config user.name 'discours' && " \ - "git config user.email 'discours@discours.io' && " \ - "touch initial && git add initial && " \ - "git commit -m 'init repo'" \ - % (repo_path) - output = subprocess.check_output(cmd, shell=True) - print(output) - - def execute(self): - repo_path = "%s" % (SHOUTS_REPO) - - if not Path(repo_path).exists(): - self.init_repo() - - #cmd = "cd %s && git checkout master" % (repo_path) - #output = subprocess.check_output(cmd, shell=True) - #print(output) - - shout_filename = "%s.mdx" % (self.slug) - shout_full_filename = "%s/%s" % (repo_path, shout_filename) - with open(shout_full_filename, mode='w', encoding='utf-8') as shout_file: - shout_file.write(bytes(self.shout_body,'utf-8').decode('utf-8','ignore')) - - author = "%s <%s>" % (self.username, self.user_email) - cmd = "cd %s && git add %s && git commit -m '%s' --author='%s'" % \ - (repo_path, shout_filename, self.comment, author) - output = subprocess.check_output(cmd, shell=True) - print(output) - - @staticmethod - async def git_task_worker(): - print("[git.task] worker start") - while True: - task = await GitTask.queue.get() - try: - task.execute() - except Exception as err: - print("[git.task] worker error = %s" % (err)) - - -class ShoutsCache: - limit = 200 - period = 60*60 #1 hour - lock = asyncio.Lock() - - @staticmethod - async def prepare_recent_published(): - with local_session() as session: - stmt = select(Shout).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - where(Shout.publishedAt != None).\ - order_by(desc("publishedAt")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.recent_published = shouts - - @staticmethod - async def prepare_recent_all(): - with local_session() as session: - stmt = select(Shout).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - order_by(desc("createdAt")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.recent_all = shouts - - @staticmethod - async def prepare_recent_commented(): - with local_session() as session: - stmt = select(Shout, func.max(Comment.createdAt).label("commentCreatedAt")).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - join(Comment).\ - where(and_(Shout.publishedAt != None, Comment.deletedAt == None)).\ - group_by(Shout.slug).\ - order_by(desc("commentCreatedAt")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.recent_commented = shouts - - - @staticmethod - async def prepare_top_overall(): - with local_session() as session: - stmt = select(Shout, func.sum(ShoutRating.value).label("rating")).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - join(ShoutRating).\ - where(Shout.publishedAt != None).\ - group_by(Shout.slug).\ - order_by(desc("rating")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.top_overall = shouts - - @staticmethod - async def prepare_top_month(): - month_ago = datetime.now() - timedelta(days = 30) - with local_session() as session: - stmt = select(Shout, func.sum(ShoutRating.value).label("rating")).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - join(ShoutRating).\ - where(and_(Shout.createdAt > month_ago, Shout.publishedAt != None)).\ - group_by(Shout.slug).\ - order_by(desc("rating")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.top_month = shouts - - @staticmethod - async def prepare_top_viewed(): - month_ago = datetime.now() - timedelta(days = 30) - with local_session() as session: - stmt = select(Shout, func.sum(ShoutViewByDay.value).label("views")).\ - options(selectinload(Shout.authors), selectinload(Shout.topics)).\ - join(ShoutViewByDay).\ - where(and_(ShoutViewByDay.day > month_ago, Shout.publishedAt != None)).\ - group_by(Shout.slug).\ - order_by(desc("views")).\ - limit(ShoutsCache.limit) - shouts = [] - for row in session.execute(stmt): - shout = row.Shout - shout.ratings = await ShoutRatingStorage.get_ratings(shout.slug) - shout.views = row.views - shouts.append(shout) - async with ShoutsCache.lock: - ShoutsCache.top_viewed = shouts - - @staticmethod - async def worker(): - print("[shouts.cache] worker start") - while True: - try: - print("[shouts.cache] updating...") - await ShoutsCache.prepare_top_month() - await ShoutsCache.prepare_top_overall() - await ShoutsCache.prepare_top_viewed() - await ShoutsCache.prepare_recent_published() - await ShoutsCache.prepare_recent_all() - await ShoutsCache.prepare_recent_commented() - print("[shouts.cache] update finished") - except Exception as err: - print("[shouts.cache] worker error: %s" % (err)) - await asyncio.sleep(ShoutsCache.period) - @query.field("topViewed") async def top_viewed(_, info, page, size): async with ShoutsCache.lock: @@ -236,20 +38,20 @@ async def recent_all(_, info, page, size): async with ShoutsCache.lock: return ShoutsCache.recent_all[(page - 1) * size : page * size] -@query.field("recentCommented") -async def recent_commented(_, info, page, size): +@query.field("recentReacted") +async def recent_reacted(_, info, page, size): async with ShoutsCache.lock: - return ShoutsCache.recent_commented[(page - 1) * size : page * size] + return ShoutsCache.recent_reacted[(page - 1) * size : page * size] @mutation.field("viewShout") async def view_shout(_, info, slug): - await ShoutViewStorage.inc_view(slug) + await ViewedStorage.inc_shout(slug) return {"error" : ""} @query.field("getShoutBySlug") async def get_shout_by_slug(_, info, slug): all_fields = [node.name.value for node in info.field_nodes[0].selection_set.selections] - selected_fields = set(["authors", "topics"]).intersection(all_fields) + selected_fields = set(["authors", "topics", "reactions"]).intersection(all_fields) select_options = [selectinload(getattr(Shout, field)) for field in selected_fields] with local_session() as session: @@ -258,23 +60,11 @@ async def get_shout_by_slug(_, info, slug): filter(Shout.slug == slug).first() if not shout: - print(f"shout with slug {slug} not exist") + print(f"[resolvers.zine] error: shout with slug {slug} not exist") return {} #TODO return error field - - shout.ratings = await ShoutRatingStorage.get_ratings(slug) + return shout -@query.field("getShoutComments") -async def get_shout_comments(_, info, slug): - with local_session() as session: - comments = session.query(Comment).\ - options(selectinload(Comment.ratings)).\ - filter(Comment.shout == slug).\ - group_by(Comment.id).all() - for comment in comments: - comment.createdBy = await UserStorage.get_user(comment.createdBy) - return comments - @query.field("shoutsByTopics") async def shouts_by_topics(_, info, slugs, page, size): page = page - 1 @@ -316,65 +106,39 @@ async def shouts_by_communities(_, info, slugs, page, size): offset(page * size) return shouts -@mutation.field("subscribe") +@mutation.field("follow") @login_required -async def subscribe(_, info, what, slug): +async def follow(_, info, what, slug): user = info.context["request"].user - try: if what == "AUTHOR": - author_subscribe(user, slug) + author_follow(user, slug) elif what == "TOPIC": - topic_subscribe(user, slug) + topic_follow(user, slug) elif what == "COMMUNITY": - community_subscribe(user, slug) - elif what == "COMMENTS": - comments_subscribe(user, slug) + community_follow(user, slug) + elif what == "REACTIONS": + reactions_follow(user, slug) except Exception as e: return {"error" : str(e)} return {} -@mutation.field("unsubscribe") +@mutation.field("unfollow") @login_required -async def unsubscribe(_, info, what, slug): +async def unfollow(_, info, what, slug): user = info.context["request"].user try: if what == "AUTHOR": - author_unsubscribe(user, slug) + author_unfollow(user, slug) elif what == "TOPIC": - topic_unsubscribe(user, slug) + topic_unfollow(user, slug) elif what == "COMMUNITY": - community_unsubscribe(user, slug) - elif what == "COMMENTS": - comments_unsubscribe(user, slug) + community_unfollow(user, slug) + elif what == "REACTIONS": + reactions_unfollow(user, slug) except Exception as e: return {"error" : str(e)} return {} - - -@mutation.field("rateShout") -@login_required -async def rate_shout(_, info, slug, value): - auth = info.context["request"].auth - user = info.context["request"].user - - with local_session() as session: - rating = session.query(ShoutRating).\ - filter(and_(ShoutRating.rater == user.slug, ShoutRating.shout == slug)).first() - if rating: - rating.value = value; - rating.ts = datetime.now() - session.commit() - else: - rating = ShoutRating.create( - rater = user.slug, - shout = slug, - value = value - ) - - await ShoutRatingStorage.update_rating(rating) - - return {"error" : ""} diff --git a/schema.graphql b/schema.graphql index 13f842e0..bafd9e0e 100644 --- a/schema.graphql +++ b/schema.graphql @@ -1,17 +1,13 @@ scalar DateTime -################################### Payload - -type Result { - error: String -} +################################### Payload ################################### type CurrentUserInfo { - totalUnreadMessages: Int - userSubscribedTopics: [String]! - userSubscribedAuthors: [String]! - userSubscribedCommunities: [String]! - userSubscribedShoutComments: [String]! + inbox: Int + topics: [String]! + authors: [String]! + reactions: [String]! + communities: [String]! } type AuthResult { @@ -21,12 +17,36 @@ type AuthResult { info: CurrentUserInfo } -type UserResult { +type Result { error: String - user: User - info: CurrentUserInfo + shout: Shout + shouts: [Shout] + author: User + authors: [User] + reaction: Reaction + reactions: [Reaction] + topic: Topic + topics: [Topic] + community: Community + communities: [Community] } +enum ReactionStatus { + NEW + UPDATED + CHANGED + EXPLAINED + DELETED +} + +type ReactionUpdating { + error: String + status: ReactionStatus + reaction: Reaction +} + +################################### Inputs ################################### + input ShoutInput { slug: String! body: String! @@ -53,53 +73,29 @@ input CommunityInput { pic: String } -type ShoutResult { - error: String - shout: Shout -} - -type ShoutsResult { - error: String - shouts: [Shout] -} - -type CommentResult { - error: String - comment: Comment -} - input TopicInput { slug: String! + community: String! title: String body: String pic: String children: [String] - community: String! + parents: [String] } -type TopicResult { - error: String - topic: Topic +input ReactionInput { + kind: Int! + shout: String! + range: String + body: String + replyTo: Int } -enum CommentStatus { - NEW - UPDATED - UPDATED_RATING - DELETED -} - -type CommentUpdatedResult { - error: String - status: CommentStatus - comment: Comment -} - -enum SubscriptionType { +enum FollowingEntity { TOPIC AUTHOR COMMUNITY - COMMENTS + REACTIONS } ################################### Mutation @@ -113,11 +109,11 @@ type Mutation { # requestEmailConfirmation: User! # shout - createShout(input: ShoutInput!): ShoutResult! - updateShout(input: ShoutInput!): ShoutResult! + createShout(input: ShoutInput!): Result! + updateShout(input: ShoutInput!): Result! deleteShout(slug: String!): Result! - rateShout(slug: String!, value: Int!): Result! viewShout(slug: String!): Result! + viewReaction(reaction_id: Int!): Result! # user profile rateUser(slug: String!, value: Int!): Result! @@ -125,35 +121,35 @@ type Mutation { updateProfile(profile: ProfileInput!): Result! # topics - createTopic(input: TopicInput!): TopicResult! - updateTopic(input: TopicInput!): TopicResult! + createTopic(input: TopicInput!): Result! + # TODO: mergeTopics(t1: String!, t2: String!): Result! + updateTopic(input: TopicInput!): Result! + destroyTopic(slug: String!): Result! - # comments - createComment(body: String!, shout: String!, replyTo: Int): CommentResult! - updateComment(id: Int!, body: String!): CommentResult! - deleteComment(id: Int!): Result! - rateComment(id: Int!, value: Int!): Result! + + # reactions + createReaction(input: ReactionInput!): Result! + updateReaction(id: Int!, body: String!): Result! + deleteReaction(id: Int!): Result! + rateReaction(id: Int!, value: Int!): Result! # community - createCommunity(community: CommunityInput!): Community! - updateCommunity(community: CommunityInput!): Community! + createCommunity(community: CommunityInput!): Result! + updateCommunity(community: CommunityInput!): Result! deleteCommunity(slug: String!): Result! # collab inviteAuthor(author: String!, shout: String!): Result! removeAuthor(author: String!, shout: String!): Result! - - # proposal - createProposal(body: String!, range: String): Proposal! - updateProposal(body: String!, range: String): Proposal! - acceptProposal(id: Int!): Result! - declineProposal(id: Int!): Result! - disableProposal(id: Int!): Result! - deleteProposal(id: Int!): Result! - rateProposal(id: Int!): Result! - subscribe(what: SubscriptionType!, slug: String!): Result! - unsubscribe(what: SubscriptionType!, slug: String!): Result! + # following + follow(what: FollowingEntity!, slug: String!): Result! + unfollow(what: FollowingEntity!, slug: String!): Result! + + # TODO: transform reaction with body to shout + + # NOTE: so-named 'collections' are tuned feeds + # TODO: Feed entity and CRUM: createFeed updateFeed deleteFeed mergeFeeds } ################################### Query @@ -163,69 +159,53 @@ type Query { # auth isEmailUsed(email: String!): Boolean! signIn(email: String!, password: String): AuthResult! - signOut: Result! - forget(email: String!): Result! - requestPasswordReset(email: String!): Result! - updatePassword(password: String!, token: String!): Result! + signOut: AuthResult! + forget(email: String!): AuthResult! + requestPasswordReset(email: String!): AuthResult! + updatePassword(password: String!, token: String!): AuthResult! + getCurrentUser: AuthResult! # profile - userSubscribers(slug: String!): [String]! - userSubscribedAuthors(slug: String!): [String]! - userSubscribedTopics(slug: String!): [String]! - getCurrentUser: UserResult! getUsersBySlugs(slugs: [String]!): [User]! + userFollowers(slug: String!): [User]! + userFollowedAuthors(slug: String!): [User]! + userFollowedTopics(slug: String!): [Topic]! + userFollowedCommunities(slug: String!): [Community]! + userReactedShouts(slug: String!): [Shout]! # test getUserRoles(slug: String!): [Role]! # shouts getShoutBySlug(slug: String!): Shout! + shoutsForFeed(page: Int!, size: Int!): [Shout]! # test shoutsByTopics(slugs: [String]!, page: Int!, size: Int!): [Shout]! shoutsByAuthors(slugs: [String]!, page: Int!, size: Int!): [Shout]! shoutsByCommunities(slugs: [String]!, page: Int!, size: Int!): [Shout]! - shoutsRatedByUser(page: Int!, size: Int!): ShoutsResult! - shoutsReviewed(page: Int!, size: Int!): [Shout]! - userUnpublishedShouts(page: Int!, size: Int!): ShoutsResult! - shoutsCommentedByUser(page: Int!, size: Int!): ShoutsResult! - recentCommented(page: Int!, size: Int!): [Shout]! - - # comments - getShoutComments(slug: String!): [Comment]! - getAllComments: [Comment]! # top10 - userComments(slug: String!, page: Int!, size: Int!): [Comment]! - - # collab - getShoutProposals(slug: String!): [Proposal]! - createProposal(body: String!, range: String): Proposal! - updateProposal(body: String!, range: String): Proposal! - destroyProposal(id: Int!): Result! - inviteAuthor(slug: String!, author: String!): Result! - removeAuthor(slug: String!, author: String!): Result! - - # mainpage articles' feeds + myCandidates(page: Int!, size: Int!): [Shout]! # test topViewed(page: Int!, size: Int!): [Shout]! + # TODO: topReacted(page: Int!, size: Int!): [Shout]! topMonth(page: Int!, size: Int!): [Shout]! topOverall(page: Int!, size: Int!): [Shout]! - recentPublished(page: Int!, size: Int!): [Shout]! - - # all articles' feed + recentPublished(page: Int!, size: Int!): [Shout]! # homepage + recentReacted(page: Int!, size: Int!): [Shout]! # test recentAll(page: Int!, size: Int!): [Shout]! - commentsAll(page: Int!, size: Int!): [Comment]! - # NOTE: so-named 'collections' are tuned feeds - # TODO: createFeed updateFeed deleteFeed mergeFeeds + + # reactons + reactionsAll(page: Int!, size: Int!): [Reaction]! + reactionsByAuthor(slug: String!, page: Int!, size: Int!): [Reaction]! + reactionsByShout(slug: String!): [Reaction]! + + # collab + inviteAuthor(slug: String!, author: String!): Result! + removeAuthor(slug: String!, author: String!): Result # topics topicsAll(page: Int!, size: Int!): [Topic]! topicsByCommunity(community: String!): [Topic]! topicsByAuthor(author: String!): [Topic]! - # TODO: CMUD for topic - # createTopic(input: TopicInput!): TopicResult! - # mergeTopics(t1: String!, t2: String!): Result! - # updateTopic(input: TopicInput!): TopicResult! - # destroyTopic(slug: String!): Result! # communities getCommunity(slug: String): Community! - getCommunities: [Community]! - # TODO: getCommunityMembers(slug: String!): [User]! + getCommunities: [Community]! # all } ############################################ Subscription @@ -234,7 +214,7 @@ type Subscription { onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User! - commentUpdated(shout: String!): CommentUpdatedResult! + reactionUpdated(shout: String!): ReactionUpdating! } ############################################ Entities @@ -302,28 +282,42 @@ type User { oid: String } -type Comment { +enum ReactionKind { + LIKE + DISLIKE + + AGREE + DISAGREE + + PROOF + DISPROOF + + COMMENT + QOUTE + + PROPOSE + ASK + + ACCEPT + REJECT +} + +type Reaction { id: Int! - createdBy: User! - body: String! - replyTo: Comment! - createdAt: DateTime! - updatedAt: DateTime shout: Shout! + createdAt: DateTime! + createdBy: User! + updatedAt: DateTime deletedAt: DateTime deletedBy: User - ratings: [CommentRating] - views: Int - oid: String + range: String # full / 0:2340 + kind: ReactionKind! + body: String + replyTo: Reaction + stat: Stat + old_id: String old_thread: String } -type CommentRating { - id: Int! - comment_id: Int! - createdBy: String! - createdAt: DateTime! - value: Int! -} # is publication type Shout { @@ -332,7 +326,7 @@ type Shout { body: String! createdAt: DateTime! authors: [User!]! - ratings: [Rating] + # ratings: [Rating] community: String cover: String layout: String @@ -349,13 +343,12 @@ type Shout { deletedBy: User publishedBy: User publishedAt: DateTime - stat: ShoutStat + stat: Stat } -type ShoutStat { - views: Int! - comments: Int! - ratings: Int! +type Stat { + viewed: Int! + reacted: Int! } type Community { @@ -369,9 +362,9 @@ type Community { type TopicStat { shouts: Int! - views: Int! - subscriptions: Int! + followers: Int! authors: Int! + viewed: Int! } type Topic { @@ -381,36 +374,11 @@ type Topic { pic: String parents: [String] # NOTE: topic can have parent topics children: [String] # and children - community: String! + community: Community! stat: TopicStat oid: String } -enum ProposalStatus { - NEW - UPDATED - UPDATED_RATING - ACCEPTED - DECLINED - DISABLED - DELETED -} - -type Proposal { - shout: String! - range: String # full / 0:2340 - body: String! - createdAt: DateTime! - createdBy: String! - updatedAt: DateTime - acceptedAt: DateTime - acceptedBy: Int - declinedAt: DateTime - declinedBy: Int - disabledAt: DateTime - disabledBy: Int -} - type Token { createdAt: DateTime! expiresAt: DateTime diff --git a/storages/gittask.py b/storages/gittask.py new file mode 100644 index 00000000..ecf0c692 --- /dev/null +++ b/storages/gittask.py @@ -0,0 +1,62 @@ +import subprocess +from pathlib import Path +import asyncio +from settings import SHOUTS_REPO + +class GitTask: + ''' every shout update use a new task ''' + queue = asyncio.Queue() + + def __init__(self, input, username, user_email, comment): + self.slug = input["slug"] + self.shout_body = input["body"] + self.username = username + self.user_email = user_email + self.comment = comment + + GitTask.queue.put_nowait(self) + + def init_repo(self): + repo_path = "%s" % (SHOUTS_REPO) + + Path(repo_path).mkdir() + + cmd = "cd %s && git init && " \ + "git config user.name 'discours' && " \ + "git config user.email 'discours@discours.io' && " \ + "touch initial && git add initial && " \ + "git commit -m 'init repo'" \ + % (repo_path) + output = subprocess.check_output(cmd, shell=True) + print(output) + + def execute(self): + repo_path = "%s" % (SHOUTS_REPO) + + if not Path(repo_path).exists(): + self.init_repo() + + #cmd = "cd %s && git checkout master" % (repo_path) + #output = subprocess.check_output(cmd, shell=True) + #print(output) + + shout_filename = "%s.mdx" % (self.slug) + shout_full_filename = "%s/%s" % (repo_path, shout_filename) + with open(shout_full_filename, mode='w', encoding='utf-8') as shout_file: + shout_file.write(bytes(self.shout_body,'utf-8').decode('utf-8','ignore')) + + author = "%s <%s>" % (self.username, self.user_email) + cmd = "cd %s && git add %s && git commit -m '%s' --author='%s'" % \ + (repo_path, shout_filename, self.comment, author) + output = subprocess.check_output(cmd, shell=True) + print(output) + + @staticmethod + async def git_task_worker(): + print("[resolvers.git] worker start") + while True: + task = await GitTask.queue.get() + try: + task.execute() + except Exception as err: + print("[resolvers.git] worker error: %s" % (err)) diff --git a/storages/reactions.py b/storages/reactions.py new file mode 100644 index 00000000..e1f9c9c3 --- /dev/null +++ b/storages/reactions.py @@ -0,0 +1,152 @@ +import asyncio +from sqlalchemy import and_, desc, func +from orm.base import local_session +from orm.reaction import Reaction, ReactionKind +from orm.topic import ShoutTopic + + +def kind_to_rate(kind) -> int: + if kind in [ + ReactionKind.AGREE, + ReactionKind.LIKE, + ReactionKind.PROOF, + ReactionKind.ACCEPT + ]: return 1 + elif kind in [ + ReactionKind.DISAGREE, + ReactionKind.DISLIKE, + ReactionKind.DISPROOF, + ReactionKind.REJECT + ]: return -1 + else: return 0 + +class ReactionsStorage: + limit = 200 + reactions = [] + rating_by_shout = {} + reactions_by_shout = {} + reactions_by_topic = {} # TODO: get sum reactions for all shouts in topic + reactions_by_author = {} + lock = asyncio.Lock() + period = 3*60 # 3 mins + + @staticmethod + async def prepare_all(session): + # FIXME + stmt = session.query(Reaction).\ + filter(Reaction.deletedAt == None).\ + order_by(desc("createdAt")).\ + limit(ReactionsStorage.limit) + reactions = [] + for row in session.execute(stmt): + reaction = row.Reaction + reactions.append(reaction) + async with ReactionsStorage.lock: + print("[storage.reactions] %d recently published reactions " % len(reactions)) + ReactionsStorage.reactions = reactions + + @staticmethod + async def prepare_by_author(session): + try: + # FIXME + by_authors = session.query(Reaction.createdBy, func.count('*').label("count")).\ + where(and_(Reaction.deletedAt == None)).\ + group_by(Reaction.createdBy).all() + except Exception as e: + print(e) + by_authors = {} + async with ReactionsStorage.lock: + ReactionsStorage.reactions_by_author = dict([stat for stat in by_authors]) + print("[storage.reactions] %d recently reacted users" % len(by_authors)) + + @staticmethod + async def prepare_by_shout(session): + try: + # FIXME + by_shouts = session.query(Reaction.shout, func.count('*').label("count")).\ + where(and_(Reaction.deletedAt == None)).\ + group_by(Reaction.shout).all() + except Exception as e: + print(e) + by_shouts = {} + async with ReactionsStorage.lock: + ReactionsStorage.reactions_by_shout = dict([stat for stat in by_shouts]) + print("[storage.reactions] %d recently reacted shouts" % len(by_shouts)) + + @staticmethod + async def calc_ratings(session): + rating_by_shout = {} + for shout in ReactionsStorage.reactions_by_shout.keys(): + rating_by_shout[shout] = 0 + shout_reactions_by_kinds = session.query(Reaction).\ + where(and_(Reaction.deletedAt == None, Reaction.shout == shout)).\ + group_by(Reaction.kind) + for kind, reactions in shout_reactions_by_kinds: + rating_by_shout[shout] += len(reactions) * kind_to_rate(kind) + async with ReactionsStorage.lock: + ReactionsStorage.rating_by_shout = rating_by_shout + + @staticmethod + async def prepare_by_topic(session): + by_topics = session.query(Reaction.shout, func.count('*').label("count")).\ + filter(Reaction.deletedAt == None).\ + join(ShoutTopic, ShoutTopic.shout == Reaction.shout).\ + order_by(desc("count")).\ + group_by(ShoutTopic.topic).all() + reactions_by_topic = {} + for stat in by_topics: + if not reactions_by_topic.get(stat.topic): + reactions_by_topic[stat.shout] = 0 + reactions_by_topic[stat.shout] += stat.count + async with ReactionsStorage.lock: + ReactionsStorage.reactions_by_topic = reactions_by_topic + + @staticmethod + async def recent() -> list[Reaction]: + async with ReactionsStorage.lock: + return ReactionsStorage.reactions.sort(key=lambda x: x.createdAt, reverse=True) + + @staticmethod + async def total() -> int: + async with ReactionsStorage.lock: + return len(ReactionsStorage.reactions) + + @staticmethod + async def by_shout(shout) -> int: + async with ReactionsStorage.lock: + stat = ReactionsStorage.reactions_by_shout.get(shout) + stat = stat if stat else 0 + return stat + + @staticmethod + async def shout_rating(shout): + async with ReactionsStorage.lock: + return ReactionsStorage.rating_by_shout.get(shout) + + @staticmethod + async def by_author(slug) -> int: + async with ReactionsStorage.lock: + stat = ReactionsStorage.reactions_by_author.get(slug) + stat = stat if stat else 0 + return stat + + @staticmethod + async def by_topic(topic) -> int: + async with ReactionsStorage.lock: + stat = ReactionsStorage.reactions_by_topic.get(topic) + stat = stat if stat else 0 + return stat + + @staticmethod + async def worker(): + while True: + try: + with local_session() as session: + await ReactionsStorage.prepare_all(session) + await ReactionsStorage.prepare_by_shout(session) + await ReactionsStorage.calc_ratings(session) + await ReactionsStorage.prepare_by_topic(session) + print("[storage.reactions] updated") + except Exception as err: + print("[storage.reactions] errror: %s" % (err)) + await asyncio.sleep(ReactionsStorage.period) diff --git a/storages/roles.py b/storages/roles.py new file mode 100644 index 00000000..650a974a --- /dev/null +++ b/storages/roles.py @@ -0,0 +1,36 @@ + +import asyncio +from sqlalchemy.orm import selectinload + +from orm.rbac import Role + +class RoleStorage: + roles = {} + lock = asyncio.Lock() + + @staticmethod + def init(session): + self = RoleStorage + roles = session.query(Role).\ + options(selectinload(Role.permissions)).all() + self.roles = dict([(role.id, role) for role in roles]) + print('[storage.roles] %d ' % len(roles)) + + + @staticmethod + async def get_role(id): + self = RoleStorage + async with self.lock: + return self.roles.get(id) + + @staticmethod + async def add_role(role): + self = RoleStorage + async with self.lock: + self.roles[id] = role + + @staticmethod + async def del_role(id): + self = RoleStorage + async with self.lock: + del self.roles[id] \ No newline at end of file diff --git a/storages/shoutauthor.py b/storages/shoutauthor.py new file mode 100644 index 00000000..4f0ed5b0 --- /dev/null +++ b/storages/shoutauthor.py @@ -0,0 +1,42 @@ + +import asyncio +from orm.base import local_session +from orm.shout import ShoutAuthor + + +class ShoutAuthorStorage: + authors_by_shout = {} + lock = asyncio.Lock() + period = 30*60 #sec + + @staticmethod + async def load(session): + self = ShoutAuthorStorage + authors = session.query(ShoutAuthor) + for author in authors: + user = author.user + shout = author.shout + if shout in self.authors_by_shout: + self.authors_by_shout[shout].append(user) + else: + self.authors_by_shout[shout] = [user] + print('[storage.shoutauthor] %d shouts ' % len(self.authors_by_shout)) + + @staticmethod + async def get_authors(shout): + self = ShoutAuthorStorage + async with self.lock: + return self.authors_by_shout.get(shout, []) + + @staticmethod + async def worker(): + self = ShoutAuthorStorage + while True: + try: + with local_session() as session: + async with self.lock: + await self.load(session) + print("[storage.shoutauthor] updated") + except Exception as err: + print("[storage.shoutauthor] errror: %s" % (err)) + await asyncio.sleep(self.period) \ No newline at end of file diff --git a/storages/shoutscache.py b/storages/shoutscache.py new file mode 100644 index 00000000..45ec0fc4 --- /dev/null +++ b/storages/shoutscache.py @@ -0,0 +1,150 @@ + +import asyncio +from datetime import datetime, timedelta +from sqlalchemy import and_, desc, func, select +from sqlalchemy.orm import selectinload +from orm.base import local_session +from orm.reaction import Reaction +from orm.shout import Shout +from storages.reactions import ReactionsStorage +from storages.viewed import ViewedByDay + + +class ShoutsCache: + limit = 200 + period = 60*60 #1 hour + lock = asyncio.Lock() + + @staticmethod + async def prepare_recent_published(): + with local_session() as session: + stmt = select(Shout).\ + options(selectinload(Shout.authors), selectinload(Shout.topics)).\ + where(Shout.publishedAt != None).\ + order_by(desc("publishedAt")).\ + limit(ShoutsCache.limit) + shouts = [] + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + async with ShoutsCache.lock: + ShoutsCache.recent_published = shouts + print("[storage.shoutscache] %d recently published shouts " % len(shouts)) + + @staticmethod + async def prepare_recent_all(): + with local_session() as session: + stmt = select(Shout).\ + options(selectinload(Shout.authors), selectinload(Shout.topics)).\ + order_by(desc("createdAt")).\ + limit(ShoutsCache.limit) + shouts = [] + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + async with ShoutsCache.lock: + ShoutsCache.recent_all = shouts + print("[storage.shoutscache] %d recently created shouts " % len(shouts)) + + @staticmethod + async def prepare_recent_reacted(): + with local_session() as session: + stmt = select(Shout, func.max(Reaction.createdAt).label("reactionCreatedAt")).\ + options(selectinload(Shout.authors), selectinload(Shout.topics)).\ + join(Reaction).\ + where(and_(Shout.publishedAt != None, Reaction.deletedAt == None)).\ + group_by(Shout.slug).\ + order_by(desc("reactionCreatedAt")).\ + limit(ShoutsCache.limit) + shouts = [] + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + async with ShoutsCache.lock: + ShoutsCache.recent_reacted = shouts + print("[storage.shoutscache] %d recently reacted shouts " % len(shouts)) + + + @staticmethod + async def prepare_top_overall(): + with local_session() as session: + # with reacted times counter + stmt = select(Shout, + func.count(Reaction.id).label("reacted")).\ + options(selectinload(Shout.authors), selectinload(Shout.topics), selectinload(Shout.reactions)).\ + join(Reaction).\ + where(and_(Shout.publishedAt != None, Reaction.deletedAt == None)).\ + group_by(Shout.slug).\ + order_by(desc("reacted")).\ + limit(ShoutsCache.limit) + shouts = [] + # with rating synthetic counter + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + shouts.sort(key = lambda shout: shout.rating, reverse = True) + async with ShoutsCache.lock: + print("[storage.shoutscache] %d top shouts " % len(shouts)) + ShoutsCache.top_overall = shouts + + @staticmethod + async def prepare_top_month(): + month_ago = datetime.now() - timedelta(days = 30) + with local_session() as session: + stmt = select(Shout, func.count(Reaction.id).label("reacted")).\ + options(selectinload(Shout.authors), selectinload(Shout.topics)).\ + join(Reaction).\ + where(and_(Shout.createdAt > month_ago, Shout.publishedAt != None)).\ + group_by(Shout.slug).\ + order_by(desc("reacted")).\ + limit(ShoutsCache.limit) + shouts = [] + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + shouts.sort(key = lambda shout: shout.rating, reverse = True) + async with ShoutsCache.lock: + print("[storage.shoutscache] %d top month shouts " % len(shouts)) + ShoutsCache.top_month = shouts + + @staticmethod + async def prepare_top_viewed(): + month_ago = datetime.now() - timedelta(days = 30) + with local_session() as session: + stmt = select(Shout, func.sum(ViewedByDay.value).label("viewed")).\ + options(selectinload(Shout.authors), selectinload(Shout.topics)).\ + join(ViewedByDay).\ + where(and_(ViewedByDay.day > month_ago, Shout.publishedAt != None)).\ + group_by(Shout.slug).\ + order_by(desc("viewed")).\ + limit(ShoutsCache.limit) + shouts = [] + for row in session.execute(stmt): + shout = row.Shout + shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0 + shouts.append(shout) + # shouts.sort(key = lambda shout: shout.viewed, reverse = True) + async with ShoutsCache.lock: + print("[storage.shoutscache] %d top viewed shouts " % len(shouts)) + ShoutsCache.top_viewed = shouts + + @staticmethod + async def worker(): + while True: + try: + await ShoutsCache.prepare_top_month() + await ShoutsCache.prepare_top_overall() + await ShoutsCache.prepare_top_viewed() + await ShoutsCache.prepare_recent_published() + await ShoutsCache.prepare_recent_all() + await ShoutsCache.prepare_recent_reacted() + print("[storage.shoutscache] updated") + except Exception as err: + print("[storage.shoutscache] error: %s" % (err)) + raise err + await asyncio.sleep(ShoutsCache.period) diff --git a/storages/topics.py b/storages/topics.py new file mode 100644 index 00000000..adbca2fa --- /dev/null +++ b/storages/topics.py @@ -0,0 +1,57 @@ + +import asyncio +from orm.topic import Topic + + +class TopicStorage: + topics = {} + lock = asyncio.Lock() + + @staticmethod + def init(session): + self = TopicStorage + topics = session.query(Topic) + self.topics = dict([(topic.slug, topic) for topic in topics]) + for topic in self.topics.values(): + self.load_parents(topic) # TODO: test + + print('[storage.topics] %d ' % len(self.topics.keys())) + + @staticmethod + def load_parents(topic): + self = TopicStorage + parents = [] + for parent in self.topics.values(): + if topic.slug in parent.children: + parents.append(parent.slug) + topic.parents = parents + return topic + + @staticmethod + async def get_topics_all(): + self = TopicStorage + async with self.lock: + return self.topics.values() + + @staticmethod + async def get_topics_by_slugs(slugs): + self = TopicStorage + async with self.lock: + if not slugs: + return self.topics.values() + topics = filter(lambda topic: topic.slug in slugs, self.topics.values()) + return list(topics) + + @staticmethod + async def get_topics_by_community(community): + self = TopicStorage + async with self.lock: + topics = filter(lambda topic: topic.community == community, self.topics.values()) + return list(topics) + + @staticmethod + async def add_topic(topic): + self = TopicStorage + async with self.lock: + self.topics[topic.slug] = topic + self.load_parents(topic) \ No newline at end of file diff --git a/storages/topicstat.py b/storages/topicstat.py new file mode 100644 index 00000000..2f9d39f0 --- /dev/null +++ b/storages/topicstat.py @@ -0,0 +1,85 @@ + + +import asyncio +from orm.base import local_session +from storages.shoutauthor import ShoutAuthorStorage +from orm.topic import ShoutTopic, TopicFollower + + +class TopicStat: + shouts_by_topic = {} + authors_by_topic = {} + followers_by_topic = {} + reactions_by_topic = {} + lock = asyncio.Lock() + period = 30*60 #sec + + @staticmethod + async def load_stat(session): + self = TopicStat + self.shouts_by_topic = {} + self.authors_by_topic = {} + shout_topics = session.query(ShoutTopic) + for shout_topic in shout_topics: + topic = shout_topic.topic + shout = shout_topic.shout + if topic in self.shouts_by_topic: + self.shouts_by_topic[topic].append(shout) + else: + self.shouts_by_topic[topic] = [shout] + + authors = await ShoutAuthorStorage.get_authors(shout) + if topic in self.authors_by_topic: + self.authors_by_topic[topic].update(authors) + else: + self.authors_by_topic[topic] = set(authors) + + print('[storage.topicstat] authors sorted') + print('[storage.topicstat] shouts sorted') + + self.followers_by_topic = {} + followings = session.query(TopicFollower) + for flw in followings: + topic = flw.topic + user = flw.follower + if topic in self.followers_by_topic: + self.followers_by_topic[topic].append(user) + else: + self.followers_by_topic[topic] = [user] + print('[storage.topicstat] followers sorted') + + @staticmethod + async def get_shouts(topic): + self = TopicStat + async with self.lock: + return self.shouts_by_topic.get(topic, []) + + @staticmethod + async def get_stat(topic) -> dict: + self = TopicStat + async with self.lock: + shouts = self.shouts_by_topic.get(topic, []) + followers = self.followers_by_topic.get(topic, []) + authors = self.authors_by_topic.get(topic, []) + reactions = self.reactions_by_topic.get(topic, []) + + return { + "shouts" : len(shouts), + "authors" : len(authors), + "followers" : len(followers), + "reactions" : len(reactions) + } + + @staticmethod + async def worker(): + self = TopicStat + while True: + try: + with local_session() as session: + async with self.lock: + await self.load_stat(session) + print("[storage.topicstat] updated") + except Exception as err: + print("[storage.topicstat] errror: %s" % (err)) + await asyncio.sleep(self.period) + diff --git a/storages/users.py b/storages/users.py new file mode 100644 index 00000000..8dd83044 --- /dev/null +++ b/storages/users.py @@ -0,0 +1,43 @@ + +import asyncio +from sqlalchemy.orm import selectinload +from orm.user import User + + +class UserStorage: + users = {} + lock = asyncio.Lock() + + @staticmethod + def init(session): + self = UserStorage + users = session.query(User).\ + options(selectinload(User.roles)).all() + self.users = dict([(user.id, user) for user in users]) + print('[storage.users] %d ' % len(self.users)) + + @staticmethod + async def get_user(id): + self = UserStorage + async with self.lock: + return self.users.get(id) + + @staticmethod + async def get_user_by_slug(slug): + self = UserStorage + async with self.lock: + for user in self.users.values(): + if user.slug == slug: + return user + + @staticmethod + async def add_user(user): + self = UserStorage + async with self.lock: + self.users[user.id] = user + + @staticmethod + async def del_user(id): + self = UserStorage + async with self.lock: + del self.users[id] \ No newline at end of file diff --git a/storages/viewed.py b/storages/viewed.py new file mode 100644 index 00000000..8a2f379d --- /dev/null +++ b/storages/viewed.py @@ -0,0 +1,122 @@ + +import asyncio +from datetime import datetime +from sqlalchemy import Column, DateTime, ForeignKey, Integer +from sqlalchemy.orm.attributes import flag_modified +from orm.base import Base, local_session + + +class ViewedByDay(Base): + __tablename__ = "viewed_by_day" + + id = None + shout = Column(ForeignKey('shout.slug'), primary_key=True) + day = Column(DateTime, primary_key=True, default=datetime.now) + value = Column(Integer) + + +class ViewedStorage: + viewed = { + 'shouts': {}, + # TODO: ? 'reactions': {}, + 'topics': {} # TODO: get sum views for all shouts in topic + } + this_day_views = {} + to_flush = [] + period = 30*60 # sec + lock = asyncio.Lock() + + @staticmethod + def init(session): + self = ViewedStorage + views = session.query(ViewedByDay).all() + + for view in views: + shout = view.shout + value = view.value + if shout: + old_value = self.viewed['shouts'].get(shout, 0) + self.viewed['shouts'][shout] = old_value + value + if not shout in self.this_day_views: + self.this_day_views[shout] = view + this_day_view = self.this_day_views[shout] + if this_day_view.day < view.day: + self.this_day_views[shout] = view + + print('[storage.viewed] watching %d shouts' % len(views)) + # TODO: add reactions ? + + @staticmethod + async def get_shout(shout_slug): + self = ViewedStorage + async with self.lock: + return self.viewed['shouts'].get(shout_slug, 0) + + # NOTE: this method is never called + @staticmethod + async def get_reaction(reaction_id): + self = ViewedStorage + async with self.lock: + return self.viewed['reactions'].get(reaction_id, 0) + + @staticmethod + async def inc_shout(shout_slug): + self = ViewedStorage + async with self.lock: + this_day_view = self.this_day_views.get(shout_slug) + day_start = datetime.now().replace(hour=0, minute=0, second=0) + if not this_day_view or this_day_view.day < day_start: + if this_day_view and getattr(this_day_view, "modified", False): + self.to_flush.append(this_day_view) + this_day_view = ViewedByDay.create(shout=shout_slug, value=1) + self.this_day_views[shout_slug] = this_day_view + else: + this_day_view.value = this_day_view.value + 1 + this_day_view.modified = True + old_value = self.viewed['shouts'].get(shout_slug, 0) + self.viewed['shotus'][shout_slug] = old_value + 1 + + @staticmethod + async def inc_reaction(shout_slug, reaction_id): + self = ViewedStorage + async with self.lock: + this_day_view = self.this_day_views.get(reaction_id) + day_start = datetime.now().replace(hour=0, minute=0, second=0) + if not this_day_view or this_day_view.day < day_start: + if this_day_view and getattr(this_day_view, "modified", False): + self.to_flush.append(this_day_view) + this_day_view = ViewedByDay.create( + shout=shout_slug, reaction=reaction_id, value=1) + self.this_day_views[shout_slug] = this_day_view + else: + this_day_view.value = this_day_view.value + 1 + this_day_view.modified = True + old_value = self.viewed['shouts'].get(shout_slug, 0) + self.viewed['shouts'][shout_slug] = old_value + 1 + old_value = self.viewed['reactions'].get(shout_slug, 0) + self.viewed['reaction'][reaction_id] = old_value + 1 + + @staticmethod + async def flush_changes(session): + self = ViewedStorage + async with self.lock: + for view in self.this_day_views.values(): + if getattr(view, "modified", False): + session.add(view) + flag_modified(view, "value") + view.modified = False + for view in self.to_flush: + session.add(view) + self.to_flush.clear() + session.commit() + + @staticmethod + async def worker(): + while True: + try: + with local_session() as session: + await ViewedStorage.flush_changes(session) + print("[storage.viewed] storage flushed changes") + except Exception as err: + print("[storage.viewed] errror: %s" % (err)) + await asyncio.sleep(ViewedStorage.period)