core/services/schema.py

42 lines
1.3 KiB
Python
Raw Normal View History

2025-01-26 15:01:04 +00:00
from asyncio.log import logger
import httpx
2025-01-26 14:59:08 +00:00
from ariadne import MutationType, QueryType
2025-01-26 15:01:04 +00:00
from settings import AUTH_URL
2023-10-06 09:51:07 +00:00
query = QueryType()
mutation = MutationType()
2024-02-19 07:14:14 +00:00
resolvers = [query, mutation]
2025-01-26 15:01:04 +00:00
async def request_graphql_data(gql, url=AUTH_URL, headers=None):
"""
Выполняет GraphQL запрос к указанному URL
:param gql: GraphQL запрос
:param url: URL для запроса, по умолчанию AUTH_URL
:param headers: Заголовки запроса
:return: Результат запроса или None в случае ошибки
"""
if not url:
return None
if headers is None:
headers = {"Content-Type": "application/json"}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=gql, headers=headers)
if response.status_code == 200:
data = response.json()
errors = data.get("errors")
if errors:
logger.error(f"{url} response: {data}")
else:
return data
else:
logger.error(f"{url}: {response.status_code} {response.text}")
except Exception as _e:
import traceback
logger.error(f"request_graphql_data error: {traceback.format_exc()}")
return None