refactored-shout-comments

This commit is contained in:
tonyrewin 2022-06-21 15:21:02 +03:00
parent 11f81d46ce
commit ace22f5e8b
15 changed files with 96 additions and 96 deletions

11
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,11 @@
{
"sqltools.connections": [
{
"previewLimit": 50,
"driver": "SQLite",
"database": "${workspaceFolder:discours-backend}/db.sqlite3",
"name": "local-discours-backend"
}
],
"sqltools.useNodeRuntime": true
}

View File

@ -16,24 +16,24 @@ class MessageSubscription:
def __init__(self, chat_id):
self.chat_id = chat_id
class MessageSubscriptions:
class MessagesStorage:
lock = asyncio.Lock()
subscriptions = []
@staticmethod
async def register_subscription(subs):
async with MessageSubscriptions.lock:
MessageSubscriptions.subscriptions.append(subs)
async with MessagesStorage.lock:
MessagesStorage.subscriptions.append(subs)
@staticmethod
async def del_subscription(subs):
async with MessageSubscriptions.lock:
MessageSubscriptions.subscriptions.remove(subs)
async with MessagesStorage.lock:
MessagesStorage.subscriptions.remove(subs)
@staticmethod
async def put(message_result):
async with MessageSubscriptions.lock:
for subs in MessageSubscriptions.subscriptions:
async with MessagesStorage.lock:
for subs in MessagesStorage.subscriptions:
if message_result.message["chatId"] == subs.chat_id:
subs.queue.put_nowait(message_result)
@ -165,7 +165,7 @@ async def create_message(_, info, chatId, body, replyTo = None):
await redis.execute("LPUSH", f"chats/{chatId}/unread/{user_slug}", str(message_id))
result = MessageResult("NEW", new_message)
await MessageSubscriptions.put(result)
await MessagesStorage.put(result)
return {"message" : new_message}
@ -203,7 +203,7 @@ async def update_message(_, info, chatId, id, body):
await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message))
result = MessageResult("UPDATED", message)
await MessageSubscriptions.put(result)
await MessagesStorage.put(result)
return {"message" : message}
@ -232,7 +232,7 @@ async def delete_message(_, info, chatId, id):
await redis.execute("LREM", f"chats/{chatId}/unread/{user_slug}", 0, str(id))
result = MessageResult("DELETED", message)
await MessageSubscriptions.put(result)
await MessagesStorage.put(result)
return {}
@ -265,12 +265,12 @@ async def message_generator(obj, info, chatId):
try:
subs = MessageSubscription(chatId)
await MessageSubscriptions.register_subscription(subs)
await MessagesStorage.register_subscription(subs)
while True:
msg = await subs.queue.get()
yield msg
finally:
await MessageSubscriptions.del_subscription(subs)
await MessagesStorage.del_subscription(subs)
@subscription.field("chatUpdated")
def message_resolver(message, info, chatId):

View File

@ -64,6 +64,7 @@ def users(users_by_oid, users_by_slug, users_data):
users_by_slug[user['slug']] = user # public
id_map[user['old_id']] = user['slug']
counter += 1
print(' - * - stage 2 users migration - * -')
for entry in users_data:
migrateUser_2stage(entry, id_map)
try:
@ -72,7 +73,7 @@ def users(users_by_oid, users_by_slug, users_data):
print(str(len(users_by_slug.items())) + ' users migrated')
except Exception:
print('json dump error')
print(users_by_oid)
# print(users_by_oid)
def topics(export_topics, topics_by_slug, topics_by_oid, cats_data, tags_data):

View File

@ -59,7 +59,7 @@ def migrate(entry, shouts_by_oid):
comment_dict['deletedBy'] = str(entry['updatedBy'])
if entry.get('updatedAt'):
comment_dict['updatedAt'] = date_parse(entry['updatedAt'])
# comment_dict['updatedBy'] = str(entry.get('updatedBy', 0)) invalid keyword for Comment
# comment_dict['updatedBy'] = str(entry.get('updatedBy', 0)) invalid keyword for Comment
# print(comment_dict)
comment = Comment.create(**comment_dict)
comment_dict['id'] = comment.id

View File

@ -33,6 +33,8 @@ def migrate(entry):
res = {}
res['old_id'] = entry['_id']
res['password'] = entry['services']['password'].get('bcrypt', '')
del entry['services']
del entry['subscribedTo']
res['username'] = entry['emails'][0]['address']
res['email'] = res['username']
res['wasOnlineAt'] = parse(entry.get('loggedInAt', entry['createdAt']))
@ -43,12 +45,11 @@ def migrate(entry):
res['notifications'] = []
res['links'] = []
res['muted'] = False
res['bio'] = html2text(entry.get('bio', ''))
res['name'] = 'anonymous'
if not res['bio'].strip() or res['bio'] == '\n': del res['bio']
if entry.get('profile'):
# slug
res['slug'] = entry['profile'].get('path')
res['bio'] = entry['profile'].get('bio','')
# userpic
try: res['userpic'] = 'https://assets.discours.io/unsafe/100x/' + entry['profile']['thumborId']
@ -86,7 +87,9 @@ def migrate(entry):
old = res['old_id']
user = User.create(**res.copy())
res['id'] = user.id
if res['slug'] == 'vorovich':
print(entry)
print(res)
return res
def migrate_email_subscription(entry):
@ -101,6 +104,7 @@ def migrate_2stage(entry, id_map):
rater_old_id = rating_entry['createdBy']
rater_slug = id_map.get(rater_old_id)
if not rater_slug:
print(rating_entry)
continue
old_id = entry['_id']
user_rating_dict = {

View File

@ -4,7 +4,7 @@ from orm.user import User, UserRating, UserRole, UserStorage
from orm.topic import Topic, TopicSubscription, TopicStorage
from orm.notification import Notification
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay,\
ShoutRatingStorage, ShoutViewStorage
ShoutRatingStorage, ShoutViewStorage, ShoutCommentsSubscription
from orm.base import Base, engine, local_session
from orm.comment import Comment, CommentRating #, CommentRatingStorage
from orm.proposal import Proposal, ProposalRating #, ProposalRatingStorage

View File

@ -7,7 +7,7 @@ from orm.base import Base
class ProposalRating(Base):
__tablename__ = "comment_rating"
__tablename__ = "proposal_rating"
id = None
proposal_id = Column(ForeignKey('proposal.id'), primary_key = True)

View File

@ -11,6 +11,14 @@ from functools import reduce
import asyncio
class ShoutCommentsSubscription(Base):
__tablename__ = "shout_comments_subscription"
id = None
subscriber = Column(ForeignKey('user.slug'), primary_key = True)
shout = Column(ForeignKey('shout.slug'), primary_key = True)
createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at")
class ShoutAuthor(Base):
__tablename__ = "shout_author"

View File

@ -2,7 +2,7 @@ from resolvers.auth import login, sign_out, is_email_used, register, confirm, au
from resolvers.zine import get_shout_by_slug, subscribe, unsubscribe, view_shout, rate_shout, \
top_month, top_overall, recent_published, recent_all, top_viewed, \
shouts_by_authors, shouts_by_topics, shouts_by_communities
from resolvers.profile import get_users_by_slugs, get_current_user, shouts_reviewed, shouts_subscribed
from resolvers.profile import get_users_by_slugs, get_current_user, shouts_reviewed, shout_comments_subscribed
from resolvers.topics import topic_subscribe, topic_unsubscribe, topics_by_author, \
topics_by_community, topics_by_slugs
from resolvers.comments import create_comment, delete_comment, update_comment, rate_comment
@ -30,7 +30,7 @@ __all__ = [
"shouts_by_topics",
"shouts_by_authors",
"shouts_by_communities",
"shouts_subscribed",
"shout_comments_subscribed",
"shouts_reviewed",
"top_month",
"top_overall",
@ -59,6 +59,8 @@ __all__ = [
# comments
"get_shout_comments",
"comments_subscribe",
"comments_unsubscribe",
"create_comment",
"update_comment",
"delete_comment",

View File

@ -44,7 +44,7 @@ async def create_proposal(_, info, body, shout, range = None):
)
result = ProposalResult("NEW", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {"proposal": proposal}
@ -67,7 +67,7 @@ async def update_proposal(_, info, id, body):
session.commit()
result = ProposalResult("UPDATED", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {"proposal": proposal}
@ -88,7 +88,7 @@ async def delete_proposal(_, info, id):
session.commit()
result = ProposalResult("DELETED", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {}
@ -109,7 +109,7 @@ async def disable_proposal(_, info, id):
session.commit()
result = ProposalResult("DISABLED", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {}
@ -137,7 +137,7 @@ async def rate_proposal(_, info, id, value):
value = value)
result = ProposalResult("UPDATED_RATING", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {}
@ -162,7 +162,7 @@ async def accept_proposal(_, info, id):
session.commit()
result = ProposalResult("ACCEPTED", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {}
@ -186,6 +186,6 @@ async def decline_proposal(_, info, id):
session.commit()
result = ProposalResult("DECLINED", proposal)
await ProposalSubscriptions.put(result)
await ProposalStorage.put(result)
return {}

View File

@ -1,5 +1,6 @@
from orm import Comment, CommentRating
from orm.base import local_session
from orm.shout import ShoutCommentsSubscription
from resolvers.base import mutation, query, subscription
from auth.authenticate import login_required
import asyncio
@ -10,37 +11,51 @@ class CommentResult:
self.status = status
self.comment = comment
class CommentSubscription:
class ShoutCommentsSubscription:
queue = asyncio.Queue()
def __init__(self, shout_slug):
self.shout_slug = shout_slug
#TODO: one class for MessageSubscription and CommentSubscription
class CommentSubscriptions:
class ShoutCommentsStorage:
lock = asyncio.Lock()
subscriptions = []
@staticmethod
async def register_subscription(subs):
self = CommentSubscriptions
self = ShoutCommentsStorage
async with self.lock:
self.subscriptions.append(subs)
@staticmethod
async def del_subscription(subs):
self = CommentSubscriptions
self = ShoutCommentsStorage
async with self.lock:
self.subscriptions.remove(subs)
@staticmethod
async def put(comment_result):
self = CommentSubscriptions
self = ShoutCommentsStorage
async with self.lock:
for subs in self.subscriptions:
if comment_result.comment.shout == subs.shout_slug:
subs.queue.put_nowait(comment_result)
def comments_subscribe(user, slug):
ShoutCommentsSubscription.create(
subscriber = user.slug,
shout = slug)
def comments_unsubscribe(user, slug):
with local_session() as session:
sub = session.query(ShoutCommentsSubscription).\
filter(and_(ShoutCommentsSubscription.subscriber == user.slug, ShoutCommentsSubscription.shout == slug)).\
first()
if not sub:
raise Exception("subscription not exist")
session.delete(sub)
session.commit()
@mutation.field("createComment")
@login_required
async def create_comment(_, info, body, shout, replyTo = None):
@ -55,7 +70,7 @@ async def create_comment(_, info, body, shout, replyTo = None):
)
result = CommentResult("NEW", comment)
await CommentSubscriptions.put(result)
await ShoutCommentsStorage.put(result)
return {"comment": comment}
@ -78,7 +93,7 @@ async def update_comment(_, info, id, body):
session.commit()
result = CommentResult("UPDATED", comment)
await CommentSubscriptions.put(result)
await ShoutCommentsStorage.put(result)
return {"comment": comment}
@ -99,7 +114,7 @@ async def delete_comment(_, info, id):
session.commit()
result = CommentResult("DELETED", comment)
await CommentSubscriptions.put(result)
await ShoutCommentsStorage.put(result)
return {}
@ -127,21 +142,6 @@ async def rate_comment(_, info, id, value):
value = value)
result = CommentResult("UPDATED_RATING", comment)
await CommentSubscriptions.put(result)
await ShoutCommentsStorage.put(result)
return {}
@subscription.source("commentUpdated")
async def comment_generator(obj, info, shout):
try:
subs = CommentSubscription(shout)
await CommentSubscriptions.register_subscription(subs)
while True:
result = await subs.queue.get()
yield result
finally:
await CommentSubscriptions.del_subscription(subs)
@subscription.field("commentUpdated")
def comment_resolver(result, info, shout):
return result

View File

@ -36,7 +36,7 @@ async def create_shout(_, info, input):
"new shout %s" % (new_shout.slug)
)
await ShoutSubscriptions.send_shout(new_shout)
await ShoutCommentsStorage.send_shout(new_shout)
return {
"shout" : new_shout

View File

@ -4,8 +4,6 @@ from orm.comment import Comment
from orm.base import local_session
from orm.topic import Topic, TopicSubscription
from resolvers.base import mutation, query, subscription
from resolvers.topics import topic_subscribe, topic_unsubscribe
from resolvers.community import community_subscribe, community_unsubscribe, get_subscribed_communities
from auth.authenticate import login_required
from inbox_resolvers.inbox import get_total_unread_messages_for_user
@ -205,25 +203,16 @@ async def shouts_reviewed(_, info, page, size):
return shouts
@query.field("shoutsSubscribed")
@query.field("shoutCommentsSubscribed")
@login_required
async def shouts_subscribed(_, info, page, size):
async def shout_comments_subscribed(_, info, slug, page, size):
user = info.context["request"].user
with local_session() as session:
shouts_by_topic = session.query(Shout).\
join(ShoutTopic).\
join(TopicSubscription, ShoutTopic.topic == TopicSubscription.topic).\
where(TopicSubscription.subscriber == user.slug)
shouts_by_author = session.query(Shout).\
join(ShoutAuthor).\
join(AuthorSubscription, ShoutAuthor.user == AuthorSubscription.author).\
where(AuthorSubscription.subscriber == user.slug)
shouts_by_community = session.query(Shout).\
join(Community).\
join(CommunitySubscription).\
where(CommunitySubscription.subscriber == user.slug)
shouts = shouts_by_topic.union(shouts_by_author).\
union(shouts_by_community).\
comments_by_shout = session.query(Comment).\
join(ShoutCommentsSubscription).\
join(ShoutCommentsSubscription, ShoutCommentsSubscription.shout == slug).\
where(ShoutCommentsSubscription.subscriber == user.slug)
comments = comments_by_shout.\
order_by(desc(Shout.createdAt)).\
limit(size).\
offset( (page - 1) * size)

View File

@ -1,12 +1,13 @@
from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\
ShoutRatingStorage, ShoutViewStorage, Comment, CommentRating, Topic
from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, \
User, Community, Resource, ShoutRatingStorage, ShoutViewStorage, \
Comment, CommentRating, Topic, ShoutCommentsSubscription
from orm.community import CommunitySubscription
from orm.base import local_session
from orm.user import UserStorage, AuthorSubscription
from orm.topic import TopicSubscription
from resolvers.base import mutation, query
from resolvers.comments import comments_subscribe, comments_unsubscribe
from auth.authenticate import login_required
from settings import SHOUTS_REPO
@ -207,26 +208,6 @@ class ShoutsCache:
print("shouts cache worker error = %s" % (err))
await asyncio.sleep(ShoutsCache.period)
class ShoutSubscriptions:
lock = asyncio.Lock()
subscriptions = []
@staticmethod
async def register_subscription(subs):
async with ShoutSubscriptions.lock:
ShoutSubscriptions.subscriptions.append(subs)
@staticmethod
async def del_subscription(subs):
async with ShoutSubscriptions.lock:
ShoutSubscriptions.subscriptions.remove(subs)
@staticmethod
async def send_shout(shout):
async with ShoutSubscriptions.lock:
for subs in ShoutSubscriptions.subscriptions:
subs.put_nowait(shout)
@query.field("topViewed")
async def top_viewed(_, info, page, size):
async with ShoutsCache.lock:
@ -344,6 +325,8 @@ async def subscribe(_, info, subscription, slug):
topic_subscribe(user, slug)
elif subscription == "COMMUNITY":
community_subscribe(user, slug)
elif comments_subscription == "COMMENTS":
comments_subscribe(user, slug)
except Exception as e:
return {"error" : e}
@ -361,6 +344,8 @@ async def unsubscribe(_, info, subscription, slug):
topic_unsubscribe(user, slug)
elif subscription == "COMMUNITY":
community_unsubscribe(user, slug)
elif subscription == "COMMENTS":
comments_unsubscribe(user, slug)
except Exception as e:
return {"error" : e}

View File

@ -201,7 +201,7 @@ type Query {
getCommunity(slug: String): Community!
getCommunities: [Community]!
shoutsSubscribed(page: Int!, size: Int!): [Shout]!
shoutCommentsSubscribed(slug: String!, page: Int!, size: Int!): [Shout]!
shoutsReviewed(page: Int!, size: Int!): [Shout]!
recentCommented(page: Int!, size: Int!): [Shout]!
}