From ace22f5e8b1f9acbfe16b4baf4f5607634f4c145 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Tue, 21 Jun 2022 15:21:02 +0300 Subject: [PATCH] refactored-shout-comments --- .vscode/settings.json | 11 ++++++++ inbox_resolvers/inbox.py | 24 ++++++++--------- migrate.py | 3 ++- migration/tables/comments.py | 2 +- migration/tables/users.py | 10 +++++--- orm/__init__.py | 2 +- orm/proposal.py | 2 +- orm/shout.py | 8 ++++++ resolvers/__init__.py | 6 +++-- resolvers/collab.py | 14 +++++----- resolvers/comments.py | 50 ++++++++++++++++++------------------ resolvers/editor.py | 2 +- resolvers/profile.py | 25 +++++------------- resolvers/zine.py | 31 ++++++---------------- schema.graphql | 2 +- 15 files changed, 96 insertions(+), 96 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..93336048 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "sqltools.connections": [ + { + "previewLimit": 50, + "driver": "SQLite", + "database": "${workspaceFolder:discours-backend}/db.sqlite3", + "name": "local-discours-backend" + } + ], + "sqltools.useNodeRuntime": true +} \ No newline at end of file diff --git a/inbox_resolvers/inbox.py b/inbox_resolvers/inbox.py index 9a55ec94..7844579d 100644 --- a/inbox_resolvers/inbox.py +++ b/inbox_resolvers/inbox.py @@ -16,24 +16,24 @@ class MessageSubscription: def __init__(self, chat_id): self.chat_id = chat_id -class MessageSubscriptions: +class MessagesStorage: lock = asyncio.Lock() subscriptions = [] @staticmethod async def register_subscription(subs): - async with MessageSubscriptions.lock: - MessageSubscriptions.subscriptions.append(subs) + async with MessagesStorage.lock: + MessagesStorage.subscriptions.append(subs) @staticmethod async def del_subscription(subs): - async with MessageSubscriptions.lock: - MessageSubscriptions.subscriptions.remove(subs) + async with MessagesStorage.lock: + MessagesStorage.subscriptions.remove(subs) @staticmethod async def put(message_result): - async with MessageSubscriptions.lock: - for subs in MessageSubscriptions.subscriptions: + async with MessagesStorage.lock: + for subs in MessagesStorage.subscriptions: if message_result.message["chatId"] == subs.chat_id: subs.queue.put_nowait(message_result) @@ -165,7 +165,7 @@ async def create_message(_, info, chatId, body, replyTo = None): await redis.execute("LPUSH", f"chats/{chatId}/unread/{user_slug}", str(message_id)) result = MessageResult("NEW", new_message) - await MessageSubscriptions.put(result) + await MessagesStorage.put(result) return {"message" : new_message} @@ -203,7 +203,7 @@ async def update_message(_, info, chatId, id, body): await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message)) result = MessageResult("UPDATED", message) - await MessageSubscriptions.put(result) + await MessagesStorage.put(result) return {"message" : message} @@ -232,7 +232,7 @@ async def delete_message(_, info, chatId, id): await redis.execute("LREM", f"chats/{chatId}/unread/{user_slug}", 0, str(id)) result = MessageResult("DELETED", message) - await MessageSubscriptions.put(result) + await MessagesStorage.put(result) return {} @@ -265,12 +265,12 @@ async def message_generator(obj, info, chatId): try: subs = MessageSubscription(chatId) - await MessageSubscriptions.register_subscription(subs) + await MessagesStorage.register_subscription(subs) while True: msg = await subs.queue.get() yield msg finally: - await MessageSubscriptions.del_subscription(subs) + await MessagesStorage.del_subscription(subs) @subscription.field("chatUpdated") def message_resolver(message, info, chatId): diff --git a/migrate.py b/migrate.py index 7f70dcc1..70308570 100644 --- a/migrate.py +++ b/migrate.py @@ -64,6 +64,7 @@ def users(users_by_oid, users_by_slug, users_data): users_by_slug[user['slug']] = user # public id_map[user['old_id']] = user['slug'] counter += 1 + print(' - * - stage 2 users migration - * -') for entry in users_data: migrateUser_2stage(entry, id_map) try: @@ -72,7 +73,7 @@ def users(users_by_oid, users_by_slug, users_data): print(str(len(users_by_slug.items())) + ' users migrated') except Exception: print('json dump error') - print(users_by_oid) + # print(users_by_oid) def topics(export_topics, topics_by_slug, topics_by_oid, cats_data, tags_data): diff --git a/migration/tables/comments.py b/migration/tables/comments.py index 21a9982a..bcab0619 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -59,7 +59,7 @@ def migrate(entry, shouts_by_oid): comment_dict['deletedBy'] = str(entry['updatedBy']) if entry.get('updatedAt'): comment_dict['updatedAt'] = date_parse(entry['updatedAt']) - # comment_dict['updatedBy'] = str(entry.get('updatedBy', 0)) invalid keyword for Comment + # comment_dict['updatedBy'] = str(entry.get('updatedBy', 0)) invalid keyword for Comment # print(comment_dict) comment = Comment.create(**comment_dict) comment_dict['id'] = comment.id diff --git a/migration/tables/users.py b/migration/tables/users.py index f7c30cef..69ee89d1 100644 --- a/migration/tables/users.py +++ b/migration/tables/users.py @@ -33,6 +33,8 @@ def migrate(entry): res = {} res['old_id'] = entry['_id'] res['password'] = entry['services']['password'].get('bcrypt', '') + del entry['services'] + del entry['subscribedTo'] res['username'] = entry['emails'][0]['address'] res['email'] = res['username'] res['wasOnlineAt'] = parse(entry.get('loggedInAt', entry['createdAt'])) @@ -43,12 +45,11 @@ def migrate(entry): res['notifications'] = [] res['links'] = [] res['muted'] = False - res['bio'] = html2text(entry.get('bio', '')) res['name'] = 'anonymous' - if not res['bio'].strip() or res['bio'] == '\n': del res['bio'] if entry.get('profile'): # slug res['slug'] = entry['profile'].get('path') + res['bio'] = entry['profile'].get('bio','') # userpic try: res['userpic'] = 'https://assets.discours.io/unsafe/100x/' + entry['profile']['thumborId'] @@ -86,7 +87,9 @@ def migrate(entry): old = res['old_id'] user = User.create(**res.copy()) res['id'] = user.id - + if res['slug'] == 'vorovich': + print(entry) + print(res) return res def migrate_email_subscription(entry): @@ -101,6 +104,7 @@ def migrate_2stage(entry, id_map): rater_old_id = rating_entry['createdBy'] rater_slug = id_map.get(rater_old_id) if not rater_slug: + print(rating_entry) continue old_id = entry['_id'] user_rating_dict = { diff --git a/orm/__init__.py b/orm/__init__.py index 958a9310..0f37ca12 100644 --- a/orm/__init__.py +++ b/orm/__init__.py @@ -4,7 +4,7 @@ from orm.user import User, UserRating, UserRole, UserStorage from orm.topic import Topic, TopicSubscription, TopicStorage from orm.notification import Notification from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay,\ - ShoutRatingStorage, ShoutViewStorage + ShoutRatingStorage, ShoutViewStorage, ShoutCommentsSubscription from orm.base import Base, engine, local_session from orm.comment import Comment, CommentRating #, CommentRatingStorage from orm.proposal import Proposal, ProposalRating #, ProposalRatingStorage diff --git a/orm/proposal.py b/orm/proposal.py index 5b4d81f4..aef6fa1c 100644 --- a/orm/proposal.py +++ b/orm/proposal.py @@ -7,7 +7,7 @@ from orm.base import Base class ProposalRating(Base): - __tablename__ = "comment_rating" + __tablename__ = "proposal_rating" id = None proposal_id = Column(ForeignKey('proposal.id'), primary_key = True) diff --git a/orm/shout.py b/orm/shout.py index 0631c7bc..de6f1d6b 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -11,6 +11,14 @@ from functools import reduce import asyncio +class ShoutCommentsSubscription(Base): + __tablename__ = "shout_comments_subscription" + + id = None + subscriber = Column(ForeignKey('user.slug'), primary_key = True) + shout = Column(ForeignKey('shout.slug'), primary_key = True) + createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at") + class ShoutAuthor(Base): __tablename__ = "shout_author" diff --git a/resolvers/__init__.py b/resolvers/__init__.py index 3e818f73..736682fe 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -2,7 +2,7 @@ from resolvers.auth import login, sign_out, is_email_used, register, confirm, au from resolvers.zine import get_shout_by_slug, subscribe, unsubscribe, view_shout, rate_shout, \ top_month, top_overall, recent_published, recent_all, top_viewed, \ shouts_by_authors, shouts_by_topics, shouts_by_communities -from resolvers.profile import get_users_by_slugs, get_current_user, shouts_reviewed, shouts_subscribed +from resolvers.profile import get_users_by_slugs, get_current_user, shouts_reviewed, shout_comments_subscribed from resolvers.topics import topic_subscribe, topic_unsubscribe, topics_by_author, \ topics_by_community, topics_by_slugs from resolvers.comments import create_comment, delete_comment, update_comment, rate_comment @@ -30,7 +30,7 @@ __all__ = [ "shouts_by_topics", "shouts_by_authors", "shouts_by_communities", - "shouts_subscribed", + "shout_comments_subscribed", "shouts_reviewed", "top_month", "top_overall", @@ -59,6 +59,8 @@ __all__ = [ # comments "get_shout_comments", + "comments_subscribe", + "comments_unsubscribe", "create_comment", "update_comment", "delete_comment", diff --git a/resolvers/collab.py b/resolvers/collab.py index 929f65f4..733fbf7a 100644 --- a/resolvers/collab.py +++ b/resolvers/collab.py @@ -44,7 +44,7 @@ async def create_proposal(_, info, body, shout, range = None): ) result = ProposalResult("NEW", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {"proposal": proposal} @@ -67,7 +67,7 @@ async def update_proposal(_, info, id, body): session.commit() result = ProposalResult("UPDATED", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {"proposal": proposal} @@ -88,7 +88,7 @@ async def delete_proposal(_, info, id): session.commit() result = ProposalResult("DELETED", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {} @@ -109,7 +109,7 @@ async def disable_proposal(_, info, id): session.commit() result = ProposalResult("DISABLED", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {} @@ -137,7 +137,7 @@ async def rate_proposal(_, info, id, value): value = value) result = ProposalResult("UPDATED_RATING", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {} @@ -162,7 +162,7 @@ async def accept_proposal(_, info, id): session.commit() result = ProposalResult("ACCEPTED", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {} @@ -186,6 +186,6 @@ async def decline_proposal(_, info, id): session.commit() result = ProposalResult("DECLINED", proposal) - await ProposalSubscriptions.put(result) + await ProposalStorage.put(result) return {} diff --git a/resolvers/comments.py b/resolvers/comments.py index b47137c2..29a4163c 100644 --- a/resolvers/comments.py +++ b/resolvers/comments.py @@ -1,5 +1,6 @@ from orm import Comment, CommentRating from orm.base import local_session +from orm.shout import ShoutCommentsSubscription from resolvers.base import mutation, query, subscription from auth.authenticate import login_required import asyncio @@ -10,37 +11,51 @@ class CommentResult: self.status = status self.comment = comment -class CommentSubscription: +class ShoutCommentsSubscription: queue = asyncio.Queue() def __init__(self, shout_slug): self.shout_slug = shout_slug -#TODO: one class for MessageSubscription and CommentSubscription -class CommentSubscriptions: +class ShoutCommentsStorage: lock = asyncio.Lock() subscriptions = [] @staticmethod async def register_subscription(subs): - self = CommentSubscriptions + self = ShoutCommentsStorage async with self.lock: self.subscriptions.append(subs) @staticmethod async def del_subscription(subs): - self = CommentSubscriptions + self = ShoutCommentsStorage async with self.lock: self.subscriptions.remove(subs) @staticmethod async def put(comment_result): - self = CommentSubscriptions + self = ShoutCommentsStorage async with self.lock: for subs in self.subscriptions: if comment_result.comment.shout == subs.shout_slug: subs.queue.put_nowait(comment_result) +def comments_subscribe(user, slug): + ShoutCommentsSubscription.create( + subscriber = user.slug, + shout = slug) + +def comments_unsubscribe(user, slug): + with local_session() as session: + sub = session.query(ShoutCommentsSubscription).\ + filter(and_(ShoutCommentsSubscription.subscriber == user.slug, ShoutCommentsSubscription.shout == slug)).\ + first() + if not sub: + raise Exception("subscription not exist") + session.delete(sub) + session.commit() + @mutation.field("createComment") @login_required async def create_comment(_, info, body, shout, replyTo = None): @@ -55,7 +70,7 @@ async def create_comment(_, info, body, shout, replyTo = None): ) result = CommentResult("NEW", comment) - await CommentSubscriptions.put(result) + await ShoutCommentsStorage.put(result) return {"comment": comment} @@ -78,7 +93,7 @@ async def update_comment(_, info, id, body): session.commit() result = CommentResult("UPDATED", comment) - await CommentSubscriptions.put(result) + await ShoutCommentsStorage.put(result) return {"comment": comment} @@ -99,7 +114,7 @@ async def delete_comment(_, info, id): session.commit() result = CommentResult("DELETED", comment) - await CommentSubscriptions.put(result) + await ShoutCommentsStorage.put(result) return {} @@ -127,21 +142,6 @@ async def rate_comment(_, info, id, value): value = value) result = CommentResult("UPDATED_RATING", comment) - await CommentSubscriptions.put(result) + await ShoutCommentsStorage.put(result) return {} - -@subscription.source("commentUpdated") -async def comment_generator(obj, info, shout): - try: - subs = CommentSubscription(shout) - await CommentSubscriptions.register_subscription(subs) - while True: - result = await subs.queue.get() - yield result - finally: - await CommentSubscriptions.del_subscription(subs) - -@subscription.field("commentUpdated") -def comment_resolver(result, info, shout): - return result diff --git a/resolvers/editor.py b/resolvers/editor.py index 17d4c841..86e7654c 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -36,7 +36,7 @@ async def create_shout(_, info, input): "new shout %s" % (new_shout.slug) ) - await ShoutSubscriptions.send_shout(new_shout) + await ShoutCommentsStorage.send_shout(new_shout) return { "shout" : new_shout diff --git a/resolvers/profile.py b/resolvers/profile.py index 43ed42d1..c22c3b1f 100644 --- a/resolvers/profile.py +++ b/resolvers/profile.py @@ -4,8 +4,6 @@ from orm.comment import Comment from orm.base import local_session from orm.topic import Topic, TopicSubscription from resolvers.base import mutation, query, subscription -from resolvers.topics import topic_subscribe, topic_unsubscribe -from resolvers.community import community_subscribe, community_unsubscribe, get_subscribed_communities from auth.authenticate import login_required from inbox_resolvers.inbox import get_total_unread_messages_for_user @@ -205,25 +203,16 @@ async def shouts_reviewed(_, info, page, size): return shouts -@query.field("shoutsSubscribed") +@query.field("shoutCommentsSubscribed") @login_required -async def shouts_subscribed(_, info, page, size): +async def shout_comments_subscribed(_, info, slug, page, size): user = info.context["request"].user with local_session() as session: - shouts_by_topic = session.query(Shout).\ - join(ShoutTopic).\ - join(TopicSubscription, ShoutTopic.topic == TopicSubscription.topic).\ - where(TopicSubscription.subscriber == user.slug) - shouts_by_author = session.query(Shout).\ - join(ShoutAuthor).\ - join(AuthorSubscription, ShoutAuthor.user == AuthorSubscription.author).\ - where(AuthorSubscription.subscriber == user.slug) - shouts_by_community = session.query(Shout).\ - join(Community).\ - join(CommunitySubscription).\ - where(CommunitySubscription.subscriber == user.slug) - shouts = shouts_by_topic.union(shouts_by_author).\ - union(shouts_by_community).\ + comments_by_shout = session.query(Comment).\ + join(ShoutCommentsSubscription).\ + join(ShoutCommentsSubscription, ShoutCommentsSubscription.shout == slug).\ + where(ShoutCommentsSubscription.subscriber == user.slug) + comments = comments_by_shout.\ order_by(desc(Shout.createdAt)).\ limit(size).\ offset( (page - 1) * size) diff --git a/resolvers/zine.py b/resolvers/zine.py index e7d550ec..9a94872c 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -1,12 +1,13 @@ -from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\ - ShoutRatingStorage, ShoutViewStorage, Comment, CommentRating, Topic +from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, \ + User, Community, Resource, ShoutRatingStorage, ShoutViewStorage, \ + Comment, CommentRating, Topic, ShoutCommentsSubscription from orm.community import CommunitySubscription from orm.base import local_session from orm.user import UserStorage, AuthorSubscription from orm.topic import TopicSubscription from resolvers.base import mutation, query - +from resolvers.comments import comments_subscribe, comments_unsubscribe from auth.authenticate import login_required from settings import SHOUTS_REPO @@ -207,26 +208,6 @@ class ShoutsCache: print("shouts cache worker error = %s" % (err)) await asyncio.sleep(ShoutsCache.period) -class ShoutSubscriptions: - lock = asyncio.Lock() - subscriptions = [] - - @staticmethod - async def register_subscription(subs): - async with ShoutSubscriptions.lock: - ShoutSubscriptions.subscriptions.append(subs) - - @staticmethod - async def del_subscription(subs): - async with ShoutSubscriptions.lock: - ShoutSubscriptions.subscriptions.remove(subs) - - @staticmethod - async def send_shout(shout): - async with ShoutSubscriptions.lock: - for subs in ShoutSubscriptions.subscriptions: - subs.put_nowait(shout) - @query.field("topViewed") async def top_viewed(_, info, page, size): async with ShoutsCache.lock: @@ -344,6 +325,8 @@ async def subscribe(_, info, subscription, slug): topic_subscribe(user, slug) elif subscription == "COMMUNITY": community_subscribe(user, slug) + elif comments_subscription == "COMMENTS": + comments_subscribe(user, slug) except Exception as e: return {"error" : e} @@ -361,6 +344,8 @@ async def unsubscribe(_, info, subscription, slug): topic_unsubscribe(user, slug) elif subscription == "COMMUNITY": community_unsubscribe(user, slug) + elif subscription == "COMMENTS": + comments_unsubscribe(user, slug) except Exception as e: return {"error" : e} diff --git a/schema.graphql b/schema.graphql index 45f2b1e9..81ee53a4 100644 --- a/schema.graphql +++ b/schema.graphql @@ -201,7 +201,7 @@ type Query { getCommunity(slug: String): Community! getCommunities: [Community]! - shoutsSubscribed(page: Int!, size: Int!): [Shout]! + shoutCommentsSubscribed(slug: String!, page: Int!, size: Int!): [Shout]! shoutsReviewed(page: Int!, size: Int!): [Shout]! recentCommented(page: Int!, size: Int!): [Shout]! }