core/auth/authenticate.py

94 lines
3.3 KiB
Python

from functools import wraps
from typing import Optional, Tuple
from graphql.type import GraphQLResolveInfo
from sqlalchemy.orm import joinedload, exc
from starlette.authentication import AuthenticationBackend
from starlette.requests import HTTPConnection
from auth.credentials import AuthCredentials, AuthUser
from services.db import local_session
from orm.user import User, Role
from settings import SESSION_TOKEN_HEADER
from auth.tokenstorage import SessionToken
from services.exceptions import OperationNotAllowed
class JWTAuthenticate(AuthenticationBackend):
async def authenticate(
self, request: HTTPConnection
) -> Optional[Tuple[AuthCredentials, AuthUser]]:
if SESSION_TOKEN_HEADER not in request.headers:
return AuthCredentials(scopes={}), AuthUser(user_id=None, username="")
token = request.headers.get(SESSION_TOKEN_HEADER)
if not token:
print("[auth.authenticate] no token in header %s" % SESSION_TOKEN_HEADER)
return AuthCredentials(scopes={}, error_message=str("no token")), AuthUser(
user_id=None, username=""
)
if len(token.split(".")) > 1:
payload = await SessionToken.verify(token)
with local_session() as session:
try:
user = (
session.query(User)
.options(
joinedload(User.roles).options(
joinedload(Role.permissions)
),
joinedload(User.ratings),
)
.filter(User.id == payload.user_id)
.one()
)
scopes = {} # TODO: integrate await user.get_permission()
return (
AuthCredentials(
user_id=payload.user_id, scopes=scopes, logged_in=True
),
AuthUser(user_id=user.id, username=""),
)
except exc.NoResultFound:
pass
return AuthCredentials(scopes={}, error_message=str("Invalid token")), AuthUser(
user_id=None, username=""
)
def login_required(func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
# print('[auth.authenticate] login required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
# print(auth)
if not auth or not auth.logged_in:
# raise Unauthorized(auth.error_message or "Please login")
return {"error": "Please login first"}
return await func(parent, info, *args, **kwargs)
return wrap
def permission_required(resource, operation, func):
@wraps(func)
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
print(
"[auth.authenticate] permission_required for %r with info %r" % (func, info)
) # debug only
auth: AuthCredentials = info.context["request"].auth
if not auth.logged_in:
raise OperationNotAllowed(auth.error_message or "Please login")
# TODO: add actual check permission logix here
return await func(parent, info, *args, **kwargs)
return wrap