fixes-for-inbox-auth-and-startup-faster

This commit is contained in:
tonyrewin 2022-11-26 01:35:42 +03:00
parent 6e073d5dd1
commit 152c3362a0
14 changed files with 168 additions and 138 deletions

View File

@ -9,7 +9,7 @@ from auth.credentials import AuthCredentials, AuthUser
from services.auth.users import UserStorage
from settings import SESSION_TOKEN_HEADER
from auth.tokenstorage import SessionToken
from base.exceptions import InvalidToken
from base.exceptions import InvalidToken, OperationNotAllowed, Unauthorized
class JWTAuthenticate(AuthenticationBackend):
@ -30,27 +30,26 @@ class JWTAuthenticate(AuthenticationBackend):
try:
if len(token.split('.')) > 1:
payload = await SessionToken.verify(token)
if payload is None:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
user = await UserStorage.get_user(payload.user_id)
if not user:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
scopes = await user.get_permission()
return (
AuthCredentials(
user_id=payload.user_id,
scopes=scopes,
logged_in=True
),
user,
)
else:
InvalidToken("please try again")
except Exception as exc:
print("[auth.authenticate] session token verify error")
print(exc)
return AuthCredentials(scopes=[], error_message=str(exc)), AuthUser(
user_id=None
)
if payload is None:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
user = await UserStorage.get_user(payload.user_id)
if not user:
return AuthCredentials(scopes=[]), AuthUser(user_id=None)
scopes = await user.get_permission()
return (
AuthCredentials(user_id=payload.user_id, scopes=scopes, logged_in=True),
user,
)
return AuthCredentials(scopes=[], error_message=str(exc)), AuthUser(user_id=None)
def login_required(func):
@ -58,10 +57,9 @@ def login_required(func):
async def wrap(parent, info: GraphQLResolveInfo, *args, **kwargs):
# print('[auth.authenticate] login required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
if auth and auth.user_id:
print(auth) # debug only
# print(auth)
if not auth.logged_in:
return {"error": auth.error_message or "Please login"}
raise OperationNotAllowed(auth.error_message or "Please login")
return await func(parent, info, *args, **kwargs)
return wrap
@ -73,9 +71,9 @@ def permission_required(resource, operation, func):
print('[auth.authenticate] permission_required for %r with info %r' % (func, info)) # debug only
auth: AuthCredentials = info.context["request"].auth
if not auth.logged_in:
return {"error": auth.error_message or "Please login"}
raise Unauthorized(auth.error_message or "Please login")
# TODO: add check permission logix
# TODO: add actual check permission logix here
return await func(parent, info, *args, **kwargs)

View File

@ -2,7 +2,7 @@ from typing import List, Optional, Text
from pydantic import BaseModel
from base.exceptions import OperationNotAllowed
from base.exceptions import Unauthorized
class Permission(BaseModel):
@ -17,11 +17,13 @@ class AuthCredentials(BaseModel):
@property
def is_admin(self):
# TODO: check admin logix
return True
async def permissions(self) -> List[Permission]:
if self.user_id is None:
raise OperationNotAllowed("Please login first")
raise Unauthorized("Please login first")
# TODO: implement permissions logix
return NotImplemented()

View File

@ -34,7 +34,7 @@ class JWTCodec:
issuer="discours"
)
r = TokenPayload(**payload)
print('[auth.jwtcodec] debug payload %r' % r)
# print('[auth.jwtcodec] debug payload %r' % r)
return r
except jwt.InvalidIssuedAtError:
print('[auth.jwtcodec] invalid issued at: %r' % r)

View File

@ -12,6 +12,7 @@ class RedisCache:
if self._instance is not None:
return
self._instance = await from_url(self._uri, encoding="utf-8")
# print(self._instance)
async def disconnect(self):
if self._instance is None:
@ -23,10 +24,11 @@ class RedisCache:
async def execute(self, command, *args, **kwargs):
while not self._instance:
await sleep(1)
try:
await self._instance.execute_command(command, *args, **kwargs)
except Exception:
pass
try:
print("[redis] " + command + ' ' + ' '.join(args))
return await self._instance.execute_command(command, *args, **kwargs)
except Exception:
pass
async def lrange(self, key, start, stop):
return await self._instance.lrange(key, start, stop)

View File

@ -20,6 +20,7 @@ from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage
from services.zine.gittask import GitTask
from services.zine.shoutauthor import ShoutAuthorStorage
from settings import DEV_SERVER_STATUS_FILE_NAME
import_module("resolvers")
@ -39,11 +40,14 @@ async def start_up():
print(views_stat_task)
reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
print(reacted_storage_task)
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())
print(shout_author_task)
topic_stat_task = asyncio.create_task(TopicStat.worker())
print(topic_stat_task)
git_task = asyncio.create_task(GitTask.git_task_worker())
print(git_task)
async def dev_start_up():
if exists(DEV_SERVER_STATUS_FILE_NAME):
return

View File

@ -17,7 +17,7 @@ def migrate(entry):
"username": email,
"email": email,
"createdAt": parse(entry["createdAt"]),
"emailConfirmed": bool(entry["emails"][0]["verified"]),
"emailConfirmed": ("@discours.io" in email) or bool(entry["emails"][0]["verified"]),
"muted": False, # amnesty
"bio": entry["profile"].get("bio", ""),
"notifications": [],

View File

@ -48,8 +48,8 @@ from resolvers.zine.load import (
from resolvers.inbox.chats import (
create_chat,
delete_chat,
update_chat,
invite_to_chat
update_chat
)
from resolvers.inbox.messages import (
create_message,
@ -111,7 +111,6 @@ __all__ = [
# inbox
"load_chats",
"load_messages_by",
"invite_to_chat",
"create_chat",
"delete_chat",
"update_chat",

View File

@ -13,7 +13,7 @@ from auth.identity import Identity, Password
from auth.jwtcodec import JWTCodec
from auth.tokenstorage import TokenStorage
from base.exceptions import (BaseHttpException, InvalidPassword, InvalidToken,
ObjectNotExist, OperationNotAllowed)
ObjectNotExist, OperationNotAllowed, Unauthorized)
from base.orm import local_session
from base.resolvers import mutation, query
from orm import Role, User
@ -37,7 +37,7 @@ async def get_current_user(_, info):
"news": await user_subscriptions(user.slug),
}
else:
raise OperationNotAllowed("No session token present in request, try to login")
raise Unauthorized("No session token present in request, try to login")
@mutation.field("confirmEmail")

View File

@ -7,43 +7,6 @@ from base.redis import redis
from base.resolvers import mutation
async def add_user_to_chat(user_slug: str, chat_id: str, chat=None):
for member in chat["users"]:
chats_ids = await redis.execute("GET", f"chats_by_user/{member}")
if chats_ids:
chats_ids = list(json.loads(chats_ids))
else:
chats_ids = []
if chat_id not in chats_ids:
chats_ids.append(chat_id)
await redis.execute("SET", f"chats_by_user/{member}", json.dumps(chats_ids))
@mutation.field("inviteChat")
async def invite_to_chat(_, info, invited: str, chat_id: str):
''' invite user with :slug to chat with :chat_id '''
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat:
return {
"error": "chat not exist"
}
chat = dict(json.loads(chat))
if not chat['private'] and user.slug not in chat['admins']:
return {
"error": "only admins can invite to private chat",
"chat": chat
}
else:
chat["users"].append(invited)
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
return {
"error": None,
"chat": chat
}
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new: dict):
@ -71,9 +34,8 @@ async def update_chat(_, info, chat_new: dict):
"admins": chat_new.get("admins", chat["admins"]),
"users": chat_new.get("users", chat["users"])
})
await add_user_to_chat(user.slug, chat_id, chat)
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0)
await redis.execute("COMMIT")
return {
"error": None,
@ -97,11 +59,22 @@ async def create_chat(_, info, title="", members=[]):
"users": members,
"admins": [user.slug, ]
}
# double creation protection
cids = await redis.execute("SMEMBERS", f"chats_by_user/{user.slug}")
for cid in cids:
c = await redis.execute("GET", F"chats/{cid.decode('utf-8')}")
isc = [x for x in c["users"] if x not in chat["users"]]
if isc == [] and chat["title"] == c["title"]:
return {
"error": "chat was created before",
"chat": chat
}
await add_user_to_chat(user.slug, chat_id, chat)
for m in members:
await redis.execute("SADD", f"chats_by_user/{m}", chat_id)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
await redis.execute("COMMIT")
return {
"error": None,
"chat": chat
@ -117,6 +90,8 @@ async def delete_chat(_, info, chat_id: str):
chat = dict(json.loads(chat))
if user.slug in chat['admins']:
await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute("SREM", "chats_by_user/" + user, chat_id)
await redis.execute("COMMIT")
else:
return {
"error": "chat not exist"

View File

@ -5,52 +5,51 @@ from auth.authenticate import login_required
from base.redis import redis
from base.orm import local_session
from base.resolvers import query
from base.exceptions import ObjectNotExist
from orm.user import User
from resolvers.zine.profile import followed_authors
from .unread import get_unread_counter
async def load_messages(chatId: str, limit: int, offset: int):
''' load :limit messages for :chatId with :offset '''
async def load_messages(chat_id: str, limit: int, offset: int):
''' load :limit messages for :chat_id with :offset '''
messages = []
message_ids = await redis.lrange(
f"chats/{chatId}/message_ids", 0 - offset - limit, 0 - offset
f"chats/{chat_id}/message_ids", offset + limit, offset
)
if message_ids:
message_keys = [
f"chats/{chatId}/messages/{mid}" for mid in message_ids
f"chats/{chat_id}/messages/{mid}" for mid in message_ids
]
messages = await redis.mget(*message_keys)
messages = [json.loads(msg) for msg in messages]
return {
"messages": messages,
"error": None
}
return messages
@query.field("loadChats")
@login_required
async def load_chats(_, info, limit: int, offset: int):
async def load_chats(_, info, limit: int = 50, offset: int = 0):
""" load :limit chats of current user with :offset """
user = info.context["request"].user
if user:
chats = await redis.execute("GET", f"chats_by_user/{user.slug}")
if chats:
chats = list(json.loads(chats))[offset:offset + limit]
if not chats:
chats = []
for c in chats:
c['messages'] = await load_messages(c['id'], limit, offset)
c['unread'] = await get_unread_counter(c['id'], user.slug)
return {
"chats": chats,
"error": None
}
else:
return {
"error": "please login",
"chats": []
}
print('[inbox] load user\'s chats')
cids = await redis.execute("SMEMBERS", "chats_by_user/" + user.slug)
if cids:
cids = list(cids)[offset:offset + limit]
if not cids:
print('[inbox.load] no chats were found')
cids = []
chats = []
for cid in cids:
c = await redis.execute("GET", "chats/" + cid.decode("utf-8"))
if c:
c = json.loads(c)
c['messages'] = await load_messages(cid, 50, 0)
c['unread'] = await get_unread_counter(cid, user.slug)
chats.append(c)
return {
"chats": chats,
"error": None
}
@query.field("loadMessagesBy")
@ -58,28 +57,36 @@ async def load_chats(_, info, limit: int, offset: int):
async def load_messages_by(_, info, by, limit: int = 50, offset: int = 0):
''' load :amolimitunt messages of :chat_id with :offset '''
user = info.context["request"].user
my_chats = await redis.execute("GET", f"chats_by_user/{user.slug}")
chat_id = by.get('chat')
if chat_id:
chat = await redis.execute("GET", f"chats/{chat_id}")
cids = await redis.execute("SMEMBERS", "chats_by_user/" + user.slug)
by_chat = by.get('chat')
messages = []
if by_chat:
chat = await redis.execute("GET", f"chats/{by_chat}")
if not chat:
return {
"error": "chat not exist"
}
messages = await load_messages(chat_id, limit, offset)
user_id = by.get('author')
if user_id:
chats = await redis.execute("GET", f"chats_by_user/{user_id}")
our_chats = list(set(chats) & set(my_chats))
for c in our_chats:
messages += await load_messages(c, limit, offset)
raise ObjectNotExist("Chat not exists")
messages = await load_messages(by_chat, limit, offset)
by_author = by.get('author')
if by_author:
if not by_chat:
# all author's messages
by_author_cids = await redis.execute("SMEMBERS", f"chats_by_user/{by_author}")
for c in list(by_author_cids & cids):
messages += await load_messages(c, limit, offset)
else:
# author's messages in chat
messages = filter(lambda m: m["author"] == by_author, messages)
body_like = by.get('body')
if body_like:
for c in my_chats:
mmm = await load_messages(c, limit, offset)
for m in mmm:
if body_like in m["body"]:
messages.append(m)
if not by_chat:
# search in all messages in all user's chats
for c in list(cids):
mmm = await load_messages(c, limit, offset)
for m in mmm:
if body_like in m["body"]:
messages.append(m)
else:
# search in chat's messages
messages = filter(lambda m: body_like in m["body"], messages)
days = by.get("days")
if days:
messages = filter(

View File

@ -17,6 +17,6 @@ async def get_total_unread_counter(user_slug: str):
if chats:
chats = json.loads(chats)
for chat_id in chats:
n = await get_unread_counter(chat_id, user_slug)
n = await get_unread_counter(chat_id.decode('utf-8'), user_slug)
unread += n
return unread

View File

@ -151,7 +151,6 @@ type Mutation {
createChat(title: String, members: [String]!): Result!
updateChat(chat: ChatInput!): Result!
deleteChat(chatId: String!): Result!
inviteChat(chatId: String!, userslug: String!): Result!
createMessage(chat: String!, body: String!, replyTo: String): Result!
updateMessage(chatId: String!, id: Int!, body: String!): Result!
@ -515,13 +514,13 @@ type Message {
type Chat {
id: String!
createdAt: Int!
createdBy: User!
createdBy: String!
updatedAt: Int!
title: String
description: String
users: [User]!
admins: [User]
messages: [Message]!
users: [String]!
admins: [String]
messages: [Message]
unread: Int
private: Boolean
}

View File

@ -3,7 +3,7 @@ import time
from base.orm import local_session
from orm.shout import Shout, ShoutTopic, ShoutAuthor
from orm.topic import TopicFollower
from sqlalchemy.sql.expression import select
# from sqlalchemy.sql.expression import select
class TopicStat:
@ -20,21 +20,19 @@ class TopicStat:
start = time.time()
self = TopicStat
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
all_shout_authors = session.query(ShoutAuthor).all()
print("[stat.topics] %d links for shouts" % len(shout_topics))
for [shout_topic, shout] in shout_topics:
tpc = shout_topic.topic
# shouts by topics
# shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict())
self.shouts_by_topic[tpc][shout.slug] = shout
# authors by topics
shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors)
self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict())
for sa in shout_authors:
self.authors_by_topic[tpc][sa.shout] = sa.caption
authors = session.query(
ShoutAuthor.user, ShoutAuthor.caption
).filter(
ShoutAuthor.shout == shout.slug
).all()
for a in authors:
self.authors_by_topic[tpc][a[0]] = a[1]
self.followers_by_topic = {}
followings = session.query(TopicFollower).all()

View File

@ -0,0 +1,46 @@
import asyncio
from base.orm import local_session
from orm.shout import ShoutAuthor, Shout
class ShoutAuthorStorage:
authors_by_shout = {}
lock = asyncio.Lock()
period = 30 * 60 # sec
@staticmethod
async def load_captions(session):
self = ShoutAuthorStorage
sas = session.query(ShoutAuthor).join(Shout).all()
for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
@staticmethod
async def get_authors(shout):
self = ShoutAuthorStorage
async with self.lock:
return self.authors_by_shout.get(shout, [])
@staticmethod
async def get_author_caption(shout, author):
self = ShoutAuthorStorage
async with self.lock:
for a in self.authors_by_shout.get(shout, []):
if author in a:
return a[1]
return {"error": "author caption not found"}
@staticmethod
async def worker():
self = ShoutAuthorStorage
while True:
try:
with local_session() as session:
async with self.lock:
await self.load_captions(session)
print("[zine.authors] index by authors was updated")
except Exception as err:
print("[zine.authors] error indexing by author: %s" % (err))
await asyncio.sleep(self.period)