core/auth/authenticate.py
2022-12-02 11:47:55 +03:00

98 lines
3.5 KiB
Python

from functools import wraps
from typing import Optional, Tuple
from graphql.type import GraphQLResolveInfo
from sqlalchemy.orm import joinedload, exc
from sqlalchemy import select, and_
from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection
from auth.credentials import AuthCredentials, AuthUser
from base.orm import local_session
from orm.user import User, Role, UserRole
from settings import SESSION_TOKEN_HEADER
from auth.tokenstorage import SessionToken
from base.exceptions import InvalidToken, Unauthorized, OperationNotAllowed
class JWTAuthenticate(AuthenticationBackend):
async def authenticate(
self, request: HTTPConnection
) -> Optional[Tuple[AuthCredentials, AuthUser]]:
if SESSION_TOKEN_HEADER not in request.headers:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
token = request.headers.get(SESSION_TOKEN_HEADER)
if not token:
print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER)
return AuthCredentials(scopes=[], error_message=str("no token")), AuthUser(
user_id=None
)
try:
if len(token.split('.')) > 1:
payload = await SessionToken.verify(token)
if payload is None:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
user = None
with local_session() as session:
try:
q = select(
User
).filter(
User.id == payload.user_id
).select_from(User)
user = session.execute(q).unique().one()
except exc.NoResultFound:
user = None
if not user:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
scopes = {} # await user.get_permission()
return (
AuthCredentials(
user_id=payload.user_id,
scopes=scopes,
logged_in=True
),
user,
)
else:
InvalidToken("please try again")
except Exception as e:
print("[auth.authenticate] session token verify error")
print(e)
return AuthCredentials(scopes=[], error_message=str(e)), AuthUser(user_id=None)
def login_required(func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
# print('[auth.authenticate] login required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
# print(auth)
if not auth or not auth.logged_in:
raise Unauthorized(auth.error_message or "Please login")
return await func(parent, info, *args, **kwargs)
return wrap
def permission_required(resource, operation, func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
if not auth.logged_in:
raise OperationNotAllowed(auth.error_message or "Please login")
# TODO: add actual check permission logix here
return await func(parent, info, *args, **kwargs)
return wrap