inbox/services/auth.py
2023-10-04 20:14:06 +03:00

94 lines
2.9 KiB
Python

from typing import Optional
from aiohttp.web import HTTPUnauthorized
from aiohttp.client import ClientSession
from pydantic import BaseModel
from functools import wraps
from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection
from services.db import local_session
from settings import AUTH_URL
from orm.author import Author
class AuthUser(BaseModel):
user_id: Optional[int]
username: Optional[str]
class AuthCredentials(BaseModel):
user_id: Optional[int] = None
scopes: Optional[dict] = {}
logged_in: bool = False
error_message: str = ""
class JWTAuthenticate(AuthenticationBackend):
async def authenticate(self, request: HTTPConnection):
logged_in, user_id = await check_auth(request)
return (
AuthCredentials(user_id=user_id, logged_in=logged_in),
AuthUser(user_id=user_id),
)
async def check_auth(req):
token = req.headers.get("Authorization")
gql = (
{"mutation": "{ getSession { user { id } } }"}
if "v2" in AUTH_URL
else {"query": "{ session { user { id } } }"}
)
headers = {"Authorization": token, "Content-Type": "application/json"}
async with ClientSession(headers=headers) as session:
async with session.post(AUTH_URL, data=gql) as response:
if response.status != 200:
return False, None
r = await response.json()
user_id = (
r.get("data", {}).get("session", {}).get("user", {}).get("id", None)
)
is_authenticated = user_id is not None
return is_authenticated, user_id
async def author_id_by_user_id(user_id):
async with local_session() as session:
author = session(Author).where(Author.user == user_id).first()
return author.id
def login_required(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
context = info.context
req = context.get("request")
is_authenticated, user_id = await check_auth(req)
if not is_authenticated:
raise Exception("You are not logged in")
else:
# Добавляем author_id в контекст
author_id = await author_id_by_user_id(user_id)
context["author_id"] = author_id
# Если пользователь аутентифицирован, выполняем резолвер
return await f(*args, **kwargs)
return decorated_function
def auth_request(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
req = args[0]
is_authenticated, user_id = await check_auth(req)
if not is_authenticated:
raise HTTPUnauthorized()
else:
author_id = await author_id_by_user_id(user_id)
req["author_id"] = author_id
return await f(*args, **kwargs)
return decorated_function