aiohttp-try
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from functools import wraps
|
||||
from httpx import AsyncClient, HTTPError
|
||||
import aiohttp
|
||||
from aiohttp.web import HTTPUnauthorized
|
||||
from settings import AUTH_URL
|
||||
|
||||
|
||||
@@ -19,19 +20,19 @@ async def check_auth(req):
|
||||
"variables": None,
|
||||
}
|
||||
|
||||
async with AsyncClient(timeout=30.0) as client:
|
||||
response = await client.post(AUTH_URL, headers=headers, json=gql)
|
||||
print(f"[services.auth] response: {response.status_code} {response.text}")
|
||||
if response.status_code != 200:
|
||||
return False, None
|
||||
r = response.json()
|
||||
try:
|
||||
user_id = r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None)
|
||||
is_authenticated = user_id is not None
|
||||
return is_authenticated, user_id
|
||||
except Exception as e:
|
||||
print(f"{e}: {r}")
|
||||
return False, None
|
||||
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30.0)) as session:
|
||||
async with session.post(AUTH_URL, headers=headers, json=gql) as response:
|
||||
print(f"[services.auth] response: {response.status} {await response.text()}")
|
||||
if response.status != 200:
|
||||
return False, None
|
||||
r = await response.json()
|
||||
try:
|
||||
user_id = r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None)
|
||||
is_authenticated = user_id is not None
|
||||
return is_authenticated, user_id
|
||||
except Exception as e:
|
||||
print(f"{e}: {r}")
|
||||
return False, None
|
||||
|
||||
|
||||
def login_required(f):
|
||||
@@ -44,10 +45,10 @@ def login_required(f):
|
||||
if not is_authenticated:
|
||||
raise Exception("You are not logged in")
|
||||
else:
|
||||
# Добавляем author_id в контекст
|
||||
# Add user_id to the context
|
||||
context["user_id"] = user_id
|
||||
|
||||
# Если пользователь аутентифицирован, выполняем резолвер
|
||||
# If the user is authenticated, execute the resolver
|
||||
return await f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
@@ -59,7 +60,7 @@ def auth_request(f):
|
||||
req = args[0]
|
||||
is_authenticated, user_id = await check_auth(req)
|
||||
if not is_authenticated:
|
||||
raise HTTPError("please, login first")
|
||||
raise HTTPUnauthorized(text="Please, login first")
|
||||
else:
|
||||
req["user_id"] = user_id
|
||||
return await f(*args, **kwargs)
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import httpx
|
||||
import aiohttp
|
||||
from services.rediscache import redis
|
||||
from orm.shout import Shout
|
||||
|
||||
@@ -20,13 +20,13 @@ class SearchService:
|
||||
cached = await redis.execute("GET", text)
|
||||
if not cached:
|
||||
async with SearchService.lock:
|
||||
# Use httpx to send a request to ElasticSearch
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Use aiohttp to send a request to ElasticSearch
|
||||
async with aiohttp.ClientSession() as session:
|
||||
search_url = f"https://search.discours.io/search?q={text}"
|
||||
response = await client.get(search_url)
|
||||
if response.status_code == 200:
|
||||
payload = response.json()
|
||||
await redis.execute("SET", text, payload)
|
||||
return json.loads(payload)
|
||||
async with session.get(search_url) as response:
|
||||
if response.status == 200:
|
||||
payload = await response.json()
|
||||
await redis.execute("SET", text, json.dumps(payload))
|
||||
return payload
|
||||
else:
|
||||
return json.loads(cached)
|
||||
|
@@ -4,7 +4,7 @@ from datetime import timedelta, timezone, datetime
|
||||
from os import environ
|
||||
|
||||
from gql import Client, gql
|
||||
from gql.transport.httpx import HTTPXAsyncTransport
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
|
||||
from services.db import local_session
|
||||
from orm.topic import Topic
|
||||
@@ -44,13 +44,11 @@ token = environ.get("ACKEE_TOKEN", "")
|
||||
|
||||
|
||||
def create_client(headers=None, schema=None):
|
||||
return Client(
|
||||
schema=schema,
|
||||
transport=HTTPXAsyncTransport(
|
||||
url="https://ackee.discours.io/api",
|
||||
headers=headers,
|
||||
),
|
||||
transport = AIOHTTPTransport(
|
||||
url="https://ackee.discours.io/api",
|
||||
headers=headers,
|
||||
)
|
||||
return Client(schema=schema, transport=transport)
|
||||
|
||||
|
||||
class ViewedStorage:
|
||||
@@ -73,7 +71,7 @@ class ViewedStorage:
|
||||
async with self.lock:
|
||||
if token:
|
||||
self.client = create_client({"Authorization": "Bearer %s" % str(token)}, schema=schema_str)
|
||||
print("[services.viewed] * authorized permanentely by ackee.discours.io: %s" % token)
|
||||
print("[services.viewed] * authorized permanently by ackee.discours.io: %s" % token)
|
||||
else:
|
||||
print("[services.viewed] * please set ACKEE_TOKEN")
|
||||
self.disabled = True
|
||||
@@ -85,19 +83,20 @@ class ViewedStorage:
|
||||
start = time.time()
|
||||
self = ViewedStorage
|
||||
try:
|
||||
self.pages = await self.client.execute_async(load_pages)
|
||||
self.pages = self.pages["domains"][0]["statistics"]["pages"]
|
||||
shouts = {}
|
||||
try:
|
||||
for page in self.pages:
|
||||
p = page["value"].split("?")[0]
|
||||
slug = p.split("discours.io/")[-1]
|
||||
shouts[slug] = page["count"]
|
||||
for slug in shouts.keys():
|
||||
await ViewedStorage.increment(slug, shouts[slug])
|
||||
except Exception:
|
||||
pass
|
||||
print("[services.viewed] ⎪ %d pages collected " % len(shouts.keys()))
|
||||
async with self.client as session:
|
||||
self.pages = await session.execute(load_pages)
|
||||
self.pages = self.pages["domains"][0]["statistics"]["pages"]
|
||||
shouts = {}
|
||||
try:
|
||||
for page in self.pages:
|
||||
p = page["value"].split("?")[0]
|
||||
slug = p.split("discours.io/")[-1]
|
||||
shouts[slug] = page["count"]
|
||||
for slug in shouts.keys():
|
||||
await ViewedStorage.increment(slug, shouts[slug])
|
||||
except Exception:
|
||||
pass
|
||||
print("[services.viewed] ⎪ %d pages collected " % len(shouts.keys()))
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
@@ -108,7 +107,7 @@ class ViewedStorage:
|
||||
async def get_facts():
|
||||
self = ViewedStorage
|
||||
async with self.lock:
|
||||
return self.client.execute_async(load_facts)
|
||||
return await self.client.execute(load_facts)
|
||||
|
||||
@staticmethod
|
||||
async def get_shout(shout_slug):
|
||||
|
Reference in New Issue
Block a user