inbox/services/core.py
Untone 83fba058ab
All checks were successful
deploy / deploy (push) Successful in 1m4s
authors-all-fix-2
2023-12-18 20:38:31 +03:00

60 lines
1.7 KiB
Python

from typing import Any, List
import aiohttp
from aiohttp import ClientResponse, ClientSession
from models.member import ChatMember
from settings import API_BASE
headers = {"Content-Type": "application/json"}
async def _request_endpoint(query_name, body) -> Any:
async with aiohttp.ClientSession() as session:
async with session.post(API_BASE, headers=headers, json=body) as response:
print(f"[services.core] {query_name} response: <{response.status}> {await response.text()}")
if response.status == 200:
r = await response.json()
if r:
return r.get("data", {}).get(query_name, {})
return []
async def get_all_authors(limit: int = 50, offset: int = 0) -> List[ChatMember]:
query_name = "load_authors_all"
gql = {
"query": "query { " + query_name + "(limit: " + str(limit) + ", offset: " + str(offset) +") { id slug pic name } }",
"variables": None,
}
return await _request_endpoint(query_name, gql)
async def get_my_followed() -> List[ChatMember]:
query_name = "get_my_followed"
gql = {
"query": "query { " + query_name + " { authors { id slug pic name } } }",
"variables": None,
}
result = await _request_endpoint(query_name, gql)
return result.get("authors", [])
async def get_author(user: str = ""):
query_name = "get_author_id"
operation = "GetAuthorId"
gql = {
"query": f"query {operation}($user: String!) {{ {query_name}(user: $user) {{ id slug pic name }} }}",
"operationName": operation,
"variables": {"user": user},
}
r = await _request_endpoint(query_name, gql)
if r and isinstance(r, list):
r = r.pop()
return r or None