core/auth/authenticate.py
2022-11-28 17:15:54 +03:00

81 lines
3.0 KiB
Python

from functools import wraps
from typing import Optional, Tuple
from graphql.type import GraphQLResolveInfo
from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection
from auth.credentials import AuthCredentials, AuthUser
from services.auth.users import UserStorage
from settings import SESSION_TOKEN_HEADER
from auth.tokenstorage import SessionToken
from base.exceptions import InvalidToken, OperationNotAllowed, Unauthorized
class JWTAuthenticate(AuthenticationBackend):
async def authenticate(
self, request: HTTPConnection
) -> Optional[Tuple[AuthCredentials, AuthUser]]:
if SESSION_TOKEN_HEADER not in request.headers:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
token = request.headers.get(SESSION_TOKEN_HEADER)
if not token:
print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER)
return AuthCredentials(scopes=[], error_message=str("no token")), AuthUser(
user_id=None
)
try:
if len(token.split('.')) > 1:
payload = await SessionToken.verify(token)
if payload is None:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
user = await UserStorage.get_user(payload.user_id)
if not user:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
scopes = await user.get_permission()
return (
AuthCredentials(
user_id=payload.user_id,
scopes=scopes,
logged_in=True
),
user,
)
else:
InvalidToken("please try again")
except Exception as exc:
print("[auth.authenticate] session token verify error")
print(exc)
return AuthCredentials(scopes=[], error_message=str(exc)), AuthUser(user_id=None)
def login_required(func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
# print('[auth.authenticate] login required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
# print(auth)
if not auth or not auth.logged_in:
raise OperationNotAllowed(auth.error_message or "Please login")
return await func(parent, info, *args, **kwargs)
return wrap
def permission_required(resource, operation, func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
if not auth.logged_in:
raise Unauthorized(auth.error_message or "Please login")
# TODO: add actual check permission logix here
return await func(parent, info, *args, **kwargs)
return wrap