core/auth/authorize.py

46 lines
1.4 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
from auth.jwtcodec import JWTCodec
2022-08-11 05:53:14 +00:00
from base.redis import redis
from settings import JWT_LIFE_SPAN
from auth.validations import User
2022-09-03 10:50:14 +00:00
2022-01-13 12:16:35 +00:00
class TokenStorage:
2022-09-03 10:50:14 +00:00
@staticmethod
async def save(token_key, life_span, auto_delete=True):
await redis.execute("SET", token_key, "True")
if auto_delete:
expire_at = (datetime.now() + timedelta(seconds=life_span)).timestamp()
await redis.execute("EXPIREAT", token_key, int(expire_at))
2022-01-13 12:16:35 +00:00
2022-09-03 10:50:14 +00:00
@staticmethod
async def exist(token_key):
return await redis.execute("GET", token_key)
2022-01-13 12:16:35 +00:00
class Authorize:
2022-09-03 10:50:14 +00:00
@staticmethod
async def authorize(
user: User, device: str = "pc", life_span=JWT_LIFE_SPAN, auto_delete=True
) -> str:
exp = datetime.utcnow() + timedelta(seconds=life_span)
token = JWTCodec.encode(user, exp=exp, device=device)
await TokenStorage.save(f"{user.id}-{token}", life_span, auto_delete)
return token
@staticmethod
async def revoke(token: str) -> bool:
try:
payload = JWTCodec.decode(token)
except: # noqa
pass
else:
await redis.execute("DEL", f"{payload.user_id}-{token}")
return True
@staticmethod
async def revoke_all(user: User):
tokens = await redis.execute("KEYS", f"{user.id}-*")
await redis.execute("DEL", *tokens)