152 lines
5.0 KiB
Python
152 lines
5.0 KiB
Python
from urllib.parse import quote_plus
|
|
from datetime import datetime
|
|
|
|
from graphql.type import GraphQLResolveInfo
|
|
from transliterate import translit
|
|
|
|
from auth.tokenstorage import TokenStorage
|
|
from auth.authenticate import login_required
|
|
from auth.email import send_auth_email
|
|
from auth.identity import Identity, Password
|
|
from base.exceptions import (
|
|
InvalidPassword,
|
|
InvalidToken,
|
|
ObjectNotExist,
|
|
OperationNotAllowed,
|
|
)
|
|
from base.orm import local_session
|
|
from base.resolvers import mutation, query
|
|
from orm import User, Role
|
|
from resolvers.profile import get_user_info
|
|
from settings import SESSION_TOKEN_HEADER
|
|
|
|
|
|
@mutation.field("refreshSession")
|
|
@login_required
|
|
async def get_current_user(_, info):
|
|
user = info.context["request"].user
|
|
user.lastSeen = datetime.now()
|
|
with local_session() as session:
|
|
session.add(user)
|
|
session.commit()
|
|
token = await TokenStorage.create_session(user)
|
|
return {
|
|
"token": token,
|
|
"user": user,
|
|
"info": await get_user_info(user.slug),
|
|
}
|
|
|
|
|
|
@mutation.field("confirmEmail")
|
|
async def confirm_email(*_, confirm_token):
|
|
"""confirm owning email address"""
|
|
user_id = None
|
|
try:
|
|
user_id = await TokenStorage.get(confirm_token)
|
|
with local_session() as session:
|
|
user = session.query(User).where(User.id == user_id).first()
|
|
session_token = TokenStorage.create_session(user)
|
|
user.emailConfirmed = True
|
|
session.add(user)
|
|
session.commit()
|
|
return {"token": session_token, "user": user}
|
|
except InvalidToken as e:
|
|
raise InvalidToken(e.message)
|
|
except Exception as e:
|
|
print(e) # FIXME: debug only
|
|
return {"error": "email not confirmed"}
|
|
|
|
|
|
@mutation.field("registerUser")
|
|
async def register(*_, email: str, password: str = ""):
|
|
"""creates new user account"""
|
|
with local_session() as session:
|
|
user = session.query(User).filter(User.email == email).first()
|
|
if user:
|
|
raise OperationNotAllowed("User already exist")
|
|
# return {"error": "user already exist"}
|
|
|
|
user_dict = {"email": email}
|
|
username = email.split("@")[0]
|
|
user_dict["username"] = username
|
|
user_dict["slug"] = quote_plus(
|
|
translit(username, "ru", reversed=True).replace(".", "-").lower()
|
|
)
|
|
if password:
|
|
user_dict["password"] = Password.encode(password)
|
|
user = User(**user_dict)
|
|
user.roles.append(Role.default_role)
|
|
with local_session() as session:
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
token = await TokenStorage.create_onetime(user)
|
|
await send_auth_email(user, token)
|
|
|
|
return {"user": user}
|
|
|
|
|
|
@mutation.field("sendLink")
|
|
async def auth_send_link(_, info, email):
|
|
"""send link with confirm code to email"""
|
|
with local_session() as session:
|
|
user = session.query(User).filter(User.email == email).first()
|
|
if not user:
|
|
raise ObjectNotExist("User not found")
|
|
token = await TokenStorage.create_onetime(user)
|
|
await send_auth_email(user, token)
|
|
return {}
|
|
|
|
|
|
@query.field("signIn")
|
|
async def login(_, info: GraphQLResolveInfo, email: str, password: str = ""):
|
|
|
|
with local_session() as session:
|
|
orm_user = session.query(User).filter(User.email == email).first()
|
|
if orm_user is None:
|
|
print(f"[auth] {email}: email not found")
|
|
# return {"error": "email not found"}
|
|
raise ObjectNotExist("User not found") # contains webserver status
|
|
|
|
if not password:
|
|
print(f"[auth] send confirm link to {email}")
|
|
token = await TokenStorage.create_onetime(orm_user)
|
|
await send_auth_email(orm_user, token)
|
|
# FIXME: not an error, warning
|
|
return {"error": "no password, email link was sent"}
|
|
|
|
else:
|
|
# sign in using password
|
|
if not orm_user.emailConfirmed:
|
|
# not an error, warns users
|
|
return {"error": "please, confirm email"}
|
|
else:
|
|
try:
|
|
user = Identity.password(orm_user, password)
|
|
session_token = await TokenStorage.create_session(user)
|
|
print(f"[auth] user {email} authorized")
|
|
return {
|
|
"token": session_token,
|
|
"user": user,
|
|
"info": await get_user_info(user.slug),
|
|
}
|
|
except InvalidPassword:
|
|
print(f"[auth] {email}: invalid password")
|
|
raise InvalidPassword("invalid passoword") # contains webserver status
|
|
# return {"error": "invalid password"}
|
|
|
|
|
|
@query.field("signOut")
|
|
@login_required
|
|
async def sign_out(_, info: GraphQLResolveInfo):
|
|
token = info.context["request"].headers[SESSION_TOKEN_HEADER]
|
|
status = await TokenStorage.revoke(token)
|
|
return status
|
|
|
|
|
|
@query.field("isEmailUsed")
|
|
async def is_email_used(_, info, email):
|
|
with local_session() as session:
|
|
user = session.query(User).filter(User.email == email).first()
|
|
return user is not None
|