Merge branch 'main' of github.com:Discours/discours-backend
This commit is contained in:
commit
dcdfdf7dc3
|
@ -25,7 +25,7 @@ class Comment(Base):
|
|||
updatedBy = Column(ForeignKey("user.id"), nullable=True, comment="Last Editor")
|
||||
deletedAt = Column(DateTime, nullable=True, comment="Deleted at")
|
||||
deletedBy = Column(ForeignKey("user.id"), nullable=True, comment="Deleted by")
|
||||
shout: int = Column(ForeignKey("shout.slug"), nullable=False)
|
||||
shout = Column(ForeignKey("shout.slug"), nullable=False)
|
||||
replyTo: int = Column(ForeignKey("comment.id"), nullable=True, comment="comment ID")
|
||||
ratings = relationship(CommentRating, foreign_keys=CommentRating.comment_id)
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ class ShoutAuthor(Base):
|
|||
|
||||
id = None
|
||||
shout = Column(ForeignKey('shout.slug'), primary_key = True)
|
||||
user = Column(ForeignKey('user.id'), primary_key = True)
|
||||
user = Column(ForeignKey('user.slug'), primary_key = True)
|
||||
|
||||
class ShoutViewer(Base):
|
||||
__tablename__ = "shout_viewer"
|
||||
|
@ -198,7 +198,7 @@ class TopicStat:
|
|||
subs = session.query(TopicSubscription)
|
||||
for sub in subs:
|
||||
topic = sub.topic
|
||||
user = sub.user
|
||||
user = sub.subscriber
|
||||
if topic in self.subs_by_topic:
|
||||
self.subs_by_topic[topic].append(user)
|
||||
else:
|
||||
|
|
|
@ -9,8 +9,8 @@ class TopicSubscription(Base):
|
|||
__tablename__ = "topic_subscription"
|
||||
|
||||
id = None
|
||||
subscriber = Column(ForeignKey('user.slug'), primary_key = True)
|
||||
topic = Column(ForeignKey('topic.slug'), primary_key = True)
|
||||
user = Column(ForeignKey('user.id'), primary_key = True)
|
||||
createdAt: str = Column(DateTime, nullable=False, default = datetime.now, comment="Created at")
|
||||
|
||||
class Topic(Base):
|
||||
|
|
16
orm/user.py
16
orm/user.py
|
@ -33,6 +33,14 @@ class UserRole(Base):
|
|||
user_id = Column(ForeignKey('user.id'), primary_key = True)
|
||||
role_id = Column(ForeignKey('role.id'), primary_key = True)
|
||||
|
||||
class AuthorSubscription(Base):
|
||||
__tablename__ = "author_subscription"
|
||||
|
||||
id = None
|
||||
subscriber = Column(ForeignKey('user.slug'), primary_key = True)
|
||||
author = Column(ForeignKey('user.slug'), primary_key = True)
|
||||
createdAt = Column(DateTime, nullable=False, default = datetime.now, comment="Created at")
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "user"
|
||||
|
||||
|
@ -96,6 +104,14 @@ class UserStorage:
|
|||
async with self.lock:
|
||||
return self.users.get(id)
|
||||
|
||||
@staticmethod
|
||||
async def get_user_by_slug(slug):
|
||||
self = UserStorage
|
||||
async with self.lock:
|
||||
for user in self.users.values():
|
||||
if user.slug == slug:
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
async def add_user(user):
|
||||
self = UserStorage
|
||||
|
|
|
@ -86,9 +86,11 @@ async def login(_, info: GraphQLResolveInfo, email: str, password: str = ""):
|
|||
with local_session() as session:
|
||||
orm_user = session.query(User).filter(User.email == email).first()
|
||||
if orm_user is None:
|
||||
print(f"signIn {email}: invalid email")
|
||||
return {"error" : "invalid email"}
|
||||
|
||||
if not password:
|
||||
print(f"signIn {email}: send auth email")
|
||||
await send_auth_email(orm_user)
|
||||
return {}
|
||||
|
||||
|
@ -101,9 +103,11 @@ async def login(_, info: GraphQLResolveInfo, email: str, password: str = ""):
|
|||
try:
|
||||
user = Identity.identity(orm_user, password)
|
||||
except InvalidPassword:
|
||||
print(f"signIn {email}: invalid password")
|
||||
return {"error" : "invalid password"}
|
||||
|
||||
token = await Authorize.authorize(user, device=device, auto_delete=auto_delete)
|
||||
print(f"signIn {email}: OK")
|
||||
return {"token" : token, "user": orm_user}
|
||||
|
||||
|
||||
|
|
|
@ -5,6 +5,42 @@ from auth.authenticate import login_required
|
|||
import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
class CommentResult:
|
||||
def __init__(self, status, comment):
|
||||
self.status = status
|
||||
self.comment = comment
|
||||
|
||||
class CommentSubscription:
|
||||
queue = asyncio.Queue()
|
||||
|
||||
def __init__(self, shout_slug):
|
||||
self.shout_slug = shout_slug
|
||||
|
||||
#TODO: one class for MessageSubscription and CommentSubscription
|
||||
class CommentSubscriptions:
|
||||
lock = asyncio.Lock()
|
||||
subscriptions = []
|
||||
|
||||
@staticmethod
|
||||
async def register_subscription(subs):
|
||||
self = CommentSubscriptions
|
||||
async with self.lock:
|
||||
self.subscriptions.append(subs)
|
||||
|
||||
@staticmethod
|
||||
async def del_subscription(subs):
|
||||
self = CommentSubscriptions
|
||||
async with self.lock:
|
||||
self.subscriptions.remove(subs)
|
||||
|
||||
@staticmethod
|
||||
async def put(comment_result):
|
||||
self = CommentSubscriptions
|
||||
async with self.lock:
|
||||
for subs in self.subscriptions:
|
||||
if comment_result.comment.shout == subs.shout_slug:
|
||||
subs.queue.put_nowait(comment_result)
|
||||
|
||||
@mutation.field("createComment")
|
||||
@login_required
|
||||
async def create_comment(_, info, body, shout, replyTo = None):
|
||||
|
@ -18,6 +54,9 @@ async def create_comment(_, info, body, shout, replyTo = None):
|
|||
replyTo = replyTo
|
||||
)
|
||||
|
||||
result = CommentResult("NEW", comment)
|
||||
await CommentSubscriptions.put(result)
|
||||
|
||||
return {"comment": comment}
|
||||
|
||||
@mutation.field("updateComment")
|
||||
|
@ -38,6 +77,11 @@ async def update_comment(_, info, id, body):
|
|||
|
||||
session.commit()
|
||||
|
||||
result = CommentResult("UPDATED", comment)
|
||||
await CommentSubscriptions.put(result)
|
||||
|
||||
return {"comment": comment}
|
||||
|
||||
@mutation.field("deleteComment")
|
||||
@login_required
|
||||
async def delete_comment(_, info, id):
|
||||
|
@ -54,6 +98,9 @@ async def delete_comment(_, info, id):
|
|||
comment.deletedAt = datetime.now()
|
||||
session.commit()
|
||||
|
||||
result = CommentResult("DELETED", comment)
|
||||
await CommentSubscriptions.put(result)
|
||||
|
||||
return {}
|
||||
|
||||
@mutation.field("rateComment")
|
||||
|
@ -63,16 +110,38 @@ async def rate_comment(_, info, id, value):
|
|||
user_id = auth.user_id
|
||||
|
||||
with local_session() as session:
|
||||
comment = session.query(Comment).filter(Comment.id == id).first()
|
||||
if not comment:
|
||||
return {"error": "invalid comment id"}
|
||||
|
||||
rating = session.query(CommentRating).\
|
||||
filter(CommentRating.comment_id == id and CommentRating.createdBy == user_id).first()
|
||||
if rating:
|
||||
rating.value = value
|
||||
session.commit()
|
||||
return {}
|
||||
|
||||
CommentRating.create(
|
||||
comment_id = id,
|
||||
createdBy = user_id,
|
||||
value = value)
|
||||
|
||||
if not rating:
|
||||
CommentRating.create(
|
||||
comment_id = id,
|
||||
createdBy = user_id,
|
||||
value = value)
|
||||
|
||||
result = CommentResult("UPDATED_RATING", comment)
|
||||
await CommentSubscriptions.put(result)
|
||||
|
||||
return {}
|
||||
|
||||
@subscription.source("commentUpdated")
|
||||
async def comment_generator(obj, info, shout):
|
||||
try:
|
||||
subs = CommentSubscription(shout)
|
||||
await CommentSubscriptions.register_subscription(subs)
|
||||
while True:
|
||||
result = await subs.queue.get()
|
||||
yield result
|
||||
finally:
|
||||
await CommentSubscriptions.del_subscription(subs)
|
||||
|
||||
@subscription.field("commentUpdated")
|
||||
def comment_resolver(result, info, shout):
|
||||
return result
|
||||
|
|
|
@ -60,20 +60,25 @@ async def create_chat(_, info, description):
|
|||
|
||||
return { "chatId" : chat_id }
|
||||
|
||||
@query.field("enterChat")
|
||||
@login_required
|
||||
async def enter_chat(_, info, chatId):
|
||||
chat = await redis.execute("GET", f"chats/{chatId}")
|
||||
if not chat:
|
||||
return { "error" : "chat not exist" }
|
||||
chat = json.loads(chat)
|
||||
|
||||
message_ids = await redis.lrange(f"chats/{chatId}/message_ids", 0, 10)
|
||||
async def load_messages(chatId, size, page):
|
||||
message_ids = await redis.lrange(f"chats/{chatId}/message_ids",
|
||||
size * (page -1), size * page - 1)
|
||||
messages = []
|
||||
if message_ids:
|
||||
message_keys = [f"chats/{chatId}/messages/{id.decode('UTF-8')}" for id in message_ids]
|
||||
messages = await redis.mget(*message_keys)
|
||||
messages = [json.loads(msg) for msg in messages]
|
||||
return messages
|
||||
|
||||
@query.field("enterChat")
|
||||
@login_required
|
||||
async def enter_chat(_, info, chatId, size):
|
||||
chat = await redis.execute("GET", f"chats/{chatId}")
|
||||
if not chat:
|
||||
return { "error" : "chat not exist" }
|
||||
chat = json.loads(chat)
|
||||
|
||||
messages = await load_messages(chatId, size, 1)
|
||||
|
||||
return {
|
||||
"chat" : chat,
|
||||
|
@ -112,25 +117,14 @@ async def create_message(_, info, chatId, body, replyTo = None):
|
|||
|
||||
@query.field("getMessages")
|
||||
@login_required
|
||||
async def get_messages(_, info, count, page):
|
||||
auth = info.context["request"].auth
|
||||
user_id = auth.user_id
|
||||
|
||||
with local_session() as session:
|
||||
messages = session.query(Message).filter(Message.author == user_id)
|
||||
|
||||
return messages
|
||||
async def get_messages(_, info, chatId, size, page):
|
||||
chat = await redis.execute("GET", f"chats/{chatId}")
|
||||
if not chat:
|
||||
return { "error" : "chat not exist" }
|
||||
|
||||
def check_and_get_message(message_id, user_id, session) :
|
||||
message = session.query(Message).filter(Message.id == message_id).first()
|
||||
|
||||
if not message :
|
||||
raise Exception("invalid id")
|
||||
|
||||
if message.author != user_id :
|
||||
raise Exception("access denied")
|
||||
|
||||
return message
|
||||
messages = await load_messages(chatId, size, page)
|
||||
|
||||
return messages
|
||||
|
||||
@mutation.field("updateMessage")
|
||||
@login_required
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
from orm import User, UserRole, Role, UserRating
|
||||
from orm.user import AuthorSubscription
|
||||
from orm.base import local_session
|
||||
from resolvers.base import mutation, query, subscription
|
||||
from auth.authenticate import login_required
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import func, and_
|
||||
from sqlalchemy.orm import selectinload
|
||||
import asyncio
|
||||
|
||||
|
@ -46,3 +47,31 @@ async def update_profile(_, info, profile):
|
|||
session.commit()
|
||||
|
||||
return {}
|
||||
|
||||
@mutation.field("authorSubscribe")
|
||||
@login_required
|
||||
async def author_subscribe(_, info, slug):
|
||||
user = info.context["request"].user
|
||||
|
||||
AuthorSubscription.create(
|
||||
subscriber = user.slug,
|
||||
author = slug
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
@mutation.field("authorUnsubscribe")
|
||||
@login_required
|
||||
async def author_unsubscribe(_, info, slug):
|
||||
user = info.context["request"].user
|
||||
|
||||
with local_session() as session:
|
||||
sub = session.query(AuthorSubscription).\
|
||||
filter(and_(AuthorSubscription.subscriber == user.slug, AuthorSubscription.author == slug)).\
|
||||
first()
|
||||
if not sub:
|
||||
return { "error" : "subscription not exist" }
|
||||
session.delete(sub)
|
||||
session.commit()
|
||||
|
||||
return {}
|
||||
|
|
|
@ -7,6 +7,8 @@ from resolvers.zine import ShoutSubscriptions
|
|||
from auth.authenticate import login_required
|
||||
import asyncio
|
||||
|
||||
from sqlalchemy import func, and_
|
||||
|
||||
@query.field("topicsBySlugs")
|
||||
async def topics_by_slugs(_, info, slugs = None):
|
||||
with local_session() as session:
|
||||
|
@ -62,27 +64,35 @@ async def update_topic(_, info, input):
|
|||
@mutation.field("topicSubscribe")
|
||||
@login_required
|
||||
async def topic_subscribe(_, info, slug):
|
||||
auth = info.context["request"].auth
|
||||
user_id = auth.user_id
|
||||
sub = TopicSubscription.create({ user: user_id, topic: slug })
|
||||
return {} # type Result
|
||||
user = info.context["request"].user
|
||||
|
||||
TopicSubscription.create(
|
||||
subscriber = user.slug,
|
||||
topic = slug)
|
||||
|
||||
return {}
|
||||
|
||||
@mutation.field("topicUnsubscribe")
|
||||
@login_required
|
||||
async def topic_unsubscribe(_, info, slug):
|
||||
auth = info.context["request"].auth
|
||||
user_id = auth.user_id
|
||||
sub = session.query(TopicSubscription).filter(TopicSubscription.user == user_id and TopicSubscription.topic == slug).first()
|
||||
user = info.context["request"].user
|
||||
|
||||
with local_session() as session:
|
||||
sub = session.query(TopicSubscription).\
|
||||
filter(and_(TopicSubscription.subscriber == user.slug, TopicSubscription.topic == slug)).\
|
||||
first()
|
||||
if not sub:
|
||||
return { "error" : "subscription not exist" }
|
||||
session.delete(sub)
|
||||
return {} # type Result
|
||||
return { "error": "session error" }
|
||||
session.commit()
|
||||
|
||||
return {}
|
||||
|
||||
@subscription.source("topicUpdated")
|
||||
async def new_shout_generator(obj, info, user_id):
|
||||
async def new_shout_generator(obj, info, user_slug):
|
||||
try:
|
||||
with local_session() as session:
|
||||
topics = session.query(TopicSubscription.topic).filter(TopicSubscription.user == user_id).all()
|
||||
topics = session.query(TopicSubscription.topic).filter(TopicSubscription.subscriber == user_slug).all()
|
||||
topics = set([item.topic for item in topics])
|
||||
shouts_queue = asyncio.Queue()
|
||||
await ShoutSubscriptions.register_subscription(shouts_queue)
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
from orm import Shout, ShoutAuthor, ShoutTopic, ShoutRating, ShoutViewByDay, User, Community, Resource,\
|
||||
ShoutRatingStorage, ShoutViewStorage, Comment, CommentRating, Topic
|
||||
from orm.base import local_session
|
||||
from orm.user import UserStorage
|
||||
from orm.user import UserStorage, AuthorSubscription
|
||||
from orm.topic import TopicSubscription
|
||||
|
||||
from resolvers.base import mutation, query
|
||||
|
||||
|
@ -222,7 +223,7 @@ async def create_shout(_, info, input):
|
|||
new_shout = Shout.create(**input)
|
||||
ShoutAuthor.create(
|
||||
shout = new_shout.slug,
|
||||
user = user.id)
|
||||
user = user.slug)
|
||||
|
||||
if "mainTopic" in input:
|
||||
topic_slugs.append(input["mainTopic"])
|
||||
|
@ -375,7 +376,7 @@ async def shouts_by_author(_, info, author, page, size):
|
|||
|
||||
shouts = session.query(Shout).\
|
||||
join(ShoutAuthor).\
|
||||
where(and_(ShoutAuthor.user == user.id, Shout.publishedAt != None)).\
|
||||
where(and_(ShoutAuthor.user == author, Shout.publishedAt != None)).\
|
||||
order_by(desc(Shout.publishedAt)).\
|
||||
limit(size).\
|
||||
offset(page * size)
|
||||
|
@ -396,3 +397,61 @@ async def shouts_by_community(_, info, community, page, size):
|
|||
limit(size).\
|
||||
offset(page * size)
|
||||
return shouts
|
||||
|
||||
@query.field("shoutsByUserSubscriptions")
|
||||
async def shouts_by_user_subscriptions(_, info, userSlug, page, size):
|
||||
user = await UserStorage.get_user_by_slug(userSlug)
|
||||
if not user:
|
||||
return
|
||||
|
||||
with local_session() as session:
|
||||
shouts_by_topic = session.query(Shout).\
|
||||
join(ShoutTopic).\
|
||||
join(TopicSubscription, ShoutTopic.topic == TopicSubscription.topic).\
|
||||
where(and_(Shout.publishedAt != None, TopicSubscription.subscriber == userSlug))
|
||||
shouts_by_author = session.query(Shout).\
|
||||
join(ShoutAuthor).\
|
||||
join(AuthorSubscription, ShoutAuthor.user == AuthorSubscription.author).\
|
||||
where(and_(Shout.publishedAt != None, AuthorSubscription.subscriber == userSlug))
|
||||
shouts = shouts_by_topic.union(shouts_by_author).\
|
||||
order_by(desc(Shout.publishedAt)).\
|
||||
limit(size).\
|
||||
offset( (page - 1) * size)
|
||||
|
||||
return shouts
|
||||
|
||||
@query.field("shoutsByUserRatingOrComment")
|
||||
async def shouts_by_user_rating_or_comment(_, info, userSlug, page, size):
|
||||
user = await UserStorage.get_user_by_slug(userSlug)
|
||||
if not user:
|
||||
return
|
||||
|
||||
with local_session() as session:
|
||||
shouts_by_rating = session.query(Shout).\
|
||||
join(ShoutRating).\
|
||||
where(and_(Shout.publishedAt != None, ShoutRating.rater == userSlug))
|
||||
shouts_by_comment = session.query(Shout).\
|
||||
join(Comment).\
|
||||
where(and_(Shout.publishedAt != None, Comment.author == user.id))
|
||||
shouts = shouts_by_rating.union(shouts_by_comment).\
|
||||
order_by(desc(Shout.publishedAt)).\
|
||||
limit(size).\
|
||||
offset( (page - 1) * size)
|
||||
|
||||
return shouts
|
||||
|
||||
@query.field("newShoutsWithoutRating")
|
||||
async def new_shouts_without_rating(_, info, userSlug, size):
|
||||
user = await UserStorage.get_user_by_slug(userSlug)
|
||||
if not user:
|
||||
return
|
||||
|
||||
#TODO: postgres heavy load
|
||||
with local_session() as session:
|
||||
shouts = session.query(Shout).distinct().\
|
||||
outerjoin(ShoutRating).\
|
||||
where(and_(Shout.publishedAt != None, ShoutRating.rater != userSlug)).\
|
||||
order_by(desc(Shout.publishedAt)).\
|
||||
limit(size)
|
||||
|
||||
return shouts
|
||||
|
|
|
@ -95,6 +95,19 @@ type TopicResult {
|
|||
topic: Topic
|
||||
}
|
||||
|
||||
enum CommentStatus {
|
||||
NEW
|
||||
UPDATED
|
||||
UPDATED_RATING
|
||||
DELETED
|
||||
}
|
||||
|
||||
type CommentUpdatedResult {
|
||||
error: String
|
||||
status: CommentStatus
|
||||
comment: Comment
|
||||
}
|
||||
|
||||
################################### Mutation
|
||||
|
||||
type Mutation {
|
||||
|
@ -137,6 +150,9 @@ type Mutation {
|
|||
createCommunity(title: String!, desc: String!): Community!
|
||||
updateCommunity(community: CommunityInput!): Community!
|
||||
deleteCommunity(id: Int!): Result!
|
||||
|
||||
authorSubscribe(slug: String!): Result!
|
||||
authorUnsubscribe(slug: String!): Result!
|
||||
}
|
||||
|
||||
################################### Query
|
||||
|
@ -154,7 +170,8 @@ type Query {
|
|||
getUserRoles(slug: String!): [Role]!
|
||||
|
||||
# messages
|
||||
getMessages(count: Int = 100, page: Int = 1): [Message!]!
|
||||
enterChat(chatId: String!, size: Int = 50): EnterChatResult!
|
||||
getMessages(chatId: String!, size: Int!, page: Int!): [Message]!
|
||||
|
||||
# shouts
|
||||
getShoutBySlug(slug: String!): Shout!
|
||||
|
@ -180,8 +197,9 @@ type Query {
|
|||
getCommunity(slug: String): Community!
|
||||
getCommunities: [Community]!
|
||||
|
||||
#messages
|
||||
enterChat(chatId: String!): EnterChatResult!
|
||||
shoutsByUserSubscriptions(userSlug: String!, page: Int!, size: Int!): [Shout]!
|
||||
shoutsByUserRatingOrComment(userSlug: String!, page: Int!, size: Int!): [Shout]!
|
||||
newShoutsWithoutRating(userSlug: String!, size: Int = 10): [Shout]!
|
||||
}
|
||||
|
||||
############################################ Subscription
|
||||
|
@ -191,7 +209,8 @@ type Subscription {
|
|||
onlineUpdated: [User!]!
|
||||
shoutUpdated: Shout!
|
||||
userUpdated: User!
|
||||
topicUpdated(user_id: Int!): Shout!
|
||||
topicUpdated(user_slug: String!): Shout!
|
||||
commentUpdated(shout: String!): CommentUpdatedResult!
|
||||
}
|
||||
|
||||
############################################ Entities
|
||||
|
|
Loading…
Reference in New Issue
Block a user