core/resolvers/community.py

98 lines
3.5 KiB
Python
Raw Permalink Normal View History

2023-10-23 14:47:11 +00:00
from orm.author import Author
2024-10-21 07:52:23 +00:00
from orm.community import Community, CommunityFollower
2023-12-17 20:30:20 +00:00
from services.db import local_session
2024-10-21 13:42:30 +00:00
from services.schema import mutation, query
2024-01-25 19:41:27 +00:00
2024-04-17 15:32:23 +00:00
@query.field("get_communities_all")
2023-10-23 14:47:11 +00:00
async def get_communities_all(_, _info):
2024-10-21 07:52:23 +00:00
return local_session().query(Community).all()
2023-10-23 14:47:11 +00:00
2024-04-17 15:32:23 +00:00
@query.field("get_community")
2024-02-24 10:22:35 +00:00
async def get_community(_, _info, slug: str):
2024-10-21 07:52:23 +00:00
q = local_session().query(Community).where(Community.slug == slug)
return q.first()
2023-10-23 14:47:11 +00:00
2024-10-21 07:52:23 +00:00
@query.field("get_communities_by_author")
async def get_communities_by_author(_, _info, slug="", user="", author_id=0):
with local_session() as session:
q = session.query(Community).join(CommunityFollower)
if slug:
author_id = session.query(Author).where(Author.slug == slug).first().id
q = q.where(CommunityFollower.author == author_id)
if user:
author_id = session.query(Author).where(Author.user == user).first().id
q = q.where(CommunityFollower.author == author_id)
if author_id:
q = q.where(CommunityFollower.author == author_id)
return q.all()
return []
2024-10-21 13:42:30 +00:00
@mutation.field("join_community")
async def join_community(_, info, slug: str):
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
with local_session() as session:
community = session.query(Community).where(Community.slug == slug).first()
if not community:
return {"ok": False, "error": "Community not found"}
session.add(CommunityFollower(community=community.id, author=author_id))
session.commit()
return {"ok": True}
@mutation.field("leave_community")
async def leave_community(_, info, slug: str):
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
with local_session() as session:
session.query(CommunityFollower).where(
CommunityFollower.author == author_id, CommunityFollower.community == slug
).delete()
session.commit()
return {"ok": True}
@mutation.field("create_community")
async def create_community(_, info, community_data):
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
with local_session() as session:
session.add(Community(author=author_id, **community_data))
session.commit()
return {"ok": True}
@mutation.field("update_community")
async def update_community(_, info, community_data):
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
slug = community_data.get("slug")
if slug:
with local_session() as session:
try:
session.query(Community).where(Community.created_by == author_id, Community.slug == slug).update(
community_data
)
session.commit()
except Exception as e:
return {"ok": False, "error": str(e)}
return {"ok": True}
return {"ok": False, "error": "Please, set community slug in input"}
@mutation.field("delete_community")
async def delete_community(_, info, slug: str):
author_dict = info.context.get("author", {})
author_id = author_dict.get("id")
with local_session() as session:
try:
session.query(Community).where(Community.slug == slug, Community.created_by == author_id).delete()
session.commit()
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}