refactored-cache-following
All checks were successful
Deploy on push / deploy (push) Successful in 36s

This commit is contained in:
Untone 2024-04-09 11:17:32 +03:00
parent b802bb029a
commit d3ae078b20
6 changed files with 243 additions and 403 deletions

View File

@ -19,7 +19,7 @@ from resolvers.reader import (get_shout, load_shouts_by, load_shouts_feed,
load_shouts_search, load_shouts_unrated) load_shouts_search, load_shouts_unrated)
from resolvers.topic import (get_topic, get_topics_all, get_topics_by_author, from resolvers.topic import (get_topic, get_topics_all, get_topics_by_author,
get_topics_by_community) get_topics_by_community)
from services.cache import events_register from services.triggers import events_register
events_register() events_register()

View File

@ -1,4 +1,3 @@
import asyncio
import json import json
import time import time
@ -9,10 +8,9 @@ from sqlalchemy_searchable import search
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.shout import ShoutAuthor, ShoutTopic from orm.shout import ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from resolvers.stat import (author_follows_authors, author_follows_topics, from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat
get_authors_with_stat_cached, get_with_stat)
from services.auth import login_required from services.auth import login_required
from services.cache import set_author_cache, update_author_followers_cache from services.cache import cache_author, cache_follower
from services.db import local_session from services.db import local_session
from services.encoders import CustomJSONEncoder from services.encoders import CustomJSONEncoder
from services.logger import root_logger as logger from services.logger import root_logger as logger
@ -55,22 +53,25 @@ async def get_author(_, _info, slug='', author_id=0):
author = None author = None
author_dict = None author_dict = None
try: try:
author_query = select(Author).filter( # lookup for cached author
or_(Author.slug == slug, Author.id == author_id) author_query = select(Author).filter(or_(Author.slug == slug, Author.id == author_id))
) found_author = local_session().execute(author_query).first()
result = await get_authors_with_stat_cached(author_query) logger.debug(f'found author id: {found_author.id}')
if not result: author_id = found_author.id if not found_author.id else author_id
raise ValueError('Author not found') cached_result = await redis.execute('GET', f'author:{author_id}')
[author] = result author_dict = json.loads(cached_result) if cached_result else None
author_id = author.id
logger.debug(f'found @{slug} with id {author_id}') # update stat from db
if isinstance(author, Author): if not author_dict or not author_dict.get('stat'):
if not author.stat: result = get_with_stat(author_query)
[author] = get_with_stat(author_query) # FIXME: with_rating=True) if not result:
if author: raise ValueError('Author not found')
await set_author_cache(author.dict()) [author] = result
logger.debug('updated author stored in cache') # use found author
author_dict = author.dict() if isinstance(author, Author):
logger.debug(f'update @{author.slug} with id {author.id}')
author_dict = author.dict()
await cache_author(author_dict)
except ValueError: except ValueError:
pass pass
except Exception: except Exception:
@ -95,11 +96,11 @@ async def get_author_by_user_id(user_id: str):
logger.debug(f'got author @{author_slug} #{author_id} cached') logger.debug(f'got author @{author_slug} #{author_id} cached')
return author return author
q = select(Author).filter(Author.user == user_id) author_query = select(Author).filter(Author.user == user_id)
result = await get_authors_with_stat_cached(q) result = get_with_stat(author_query)
if result: if result:
[author] = result [author] = result
await set_author_cache(author.dict()) await cache_author(author.dict())
except Exception as exc: except Exception as exc:
import traceback import traceback
@ -286,36 +287,32 @@ def create_author(user_id: str, slug: str, name: str = ''):
async def get_author_followers(_, _info, slug: str): async def get_author_followers(_, _info, slug: str):
logger.debug(f'getting followers for @{slug}') logger.debug(f'getting followers for @{slug}')
try: try:
with local_session() as session: author_alias = aliased(Author)
author_alias = aliased(Author) author_query = select(author_alias).filter(author_alias.slug == slug)
result = ( result = local_session().execute(author_query).first()
session.query(author_alias).filter(author_alias.slug == slug).first() if result:
) [author] = result
if result: author_id = author.id
[author] = result cached = await redis.execute('GET', f'author:{author_id}:followers')
author_id = author.id if not cached:
cached = await redis.execute('GET', f'author:{author_id}:followers') author_follower_alias = aliased(AuthorFollower, name='af')
if not cached: q = select(Author).join(
author_follower_alias = aliased(AuthorFollower, name='af') author_follower_alias,
q = select(Author).join( and_(
author_follower_alias, author_follower_alias.author == author_id,
and_( author_follower_alias.follower == Author.id,
author_follower_alias.author == author_id, ),
author_follower_alias.follower == Author.id, )
), results = get_with_stat(q)
) if isinstance(results, list):
results = await get_authors_with_stat_cached(q) for follower in results:
_ = asyncio.create_task( await cache_follower(follower, author)
update_author_followers_cache(
author_id, [x.dict() for x in results]
)
)
logger.debug(f'@{slug} cache updated with {len(results)} followers') logger.debug(f'@{slug} cache updated with {len(results)} followers')
return results return results
else: else:
logger.debug(f'@{slug} got followers cached') logger.debug(f'@{slug} got followers cached')
if isinstance(cached, str): if isinstance(cached, str):
return json.loads(cached) return json.loads(cached)
except Exception as exc: except Exception as exc:
import traceback import traceback
@ -327,4 +324,4 @@ async def get_author_followers(_, _info, slug: str):
@query.field('search_authors') @query.field('search_authors')
async def search_authors(_, _info, what: str): async def search_authors(_, _info, what: str):
q = search(select(Author), what) q = search(select(Author), what)
return await get_authors_with_stat_cached(q) return get_with_stat(q)

View File

@ -8,16 +8,12 @@ from sqlalchemy.sql import and_
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
from orm.community import Community from orm.community import Community
# from orm.community import Community
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import Shout, ShoutReactionsFollower from orm.shout import Shout, ShoutReactionsFollower
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.stat import (author_follows_authors, author_follows_topics, from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat
get_authors_with_stat_cached,
get_topics_with_stat_cached)
from services.auth import login_required from services.auth import login_required
from services.cache import (DEFAULT_FOLLOWS, update_followers_for_author, from services.cache import DEFAULT_FOLLOWS
update_follows_for_author)
from services.db import local_session from services.db import local_session
from services.logger import root_logger as logger from services.logger import root_logger as logger
from services.notify import notify_follower from services.notify import notify_follower
@ -33,53 +29,37 @@ async def follow(_, info, what, slug):
user_id = info.context.get('user_id') user_id = info.context.get('user_id')
if not user_id: if not user_id:
return {'error': 'unauthorized'} return {'error': 'unauthorized'}
[follower] = await get_authors_with_stat_cached(
select(Author).select_from(Author).filter(Author.user == user_id) follower_query = select(Author).select_from(Author).filter(Author.user == user_id)
) [follower] = local_session().execute(follower_query)
if not follower: if not follower:
return {'error': 'cant find follower'} return {'error': 'cant find follower'}
if what == 'AUTHOR': if what == 'AUTHOR':
error = author_follow(follower.id, slug) error = author_follow(follower.id, slug)
if not error: if not error:
logger.debug(f'@{follower.slug} followed @{slug}') author_query = select(Author).where(Author.slug == slug)
[author] = await get_authors_with_stat_cached( [author] = local_session().execute(author_query)
select(Author).select_from(Author).where(Author.slug == slug) await notify_follower(follower.dict(), author.id, 'follow')
)
if not author:
return {'error': 'author is not found'}
follows = await update_follows_for_author(
follower, 'author', author.dict(), True
)
_followers = await update_followers_for_author(follower, author, True)
await notify_follower(follower.dict(), author.id, 'unfollow')
elif what == 'TOPIC': elif what == 'TOPIC':
error = topic_follow(follower.id, slug) error = topic_follow(follower.id, slug)
if not error:
[topic] = await get_topics_with_stat_cached(
select(Topic).where(Topic.slug == slug)
)
if not topic:
return {'error': 'topic is not found'}
follows = await update_follows_for_author(
follower, 'topic', topic.dict(), True
)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
# FIXME: when more communities
follows = local_session().execute(select(Community)) follows = local_session().execute(select(Community))
elif what == 'SHOUT': elif what == 'SHOUT':
error = reactions_follow(follower.id, slug) error = reactions_follow(follower.id, slug)
if not error:
[shout] = local_session().execute(select(Shout).where(Shout.slug == slug))
if not shout:
return {'error': 'cant find shout'}
follows = await update_follows_for_author(
follower, 'shout', shout.dict(), True
)
return {f'{what.lower()}s': follows, 'error': error} if error:
return {'error': error}
entity = what.lower()
follows_str = await redis.execute('GET', f'author:{follower.id}:follows-{entity}s')
if follows_str:
follows = json.loads(follows_str)
return { f'{entity}s': follows }
@mutation.field('unfollow') @mutation.field('unfollow')
@ -91,54 +71,33 @@ async def unfollow(_, info, what, slug):
if not user_id: if not user_id:
return {'error': 'unauthorized'} return {'error': 'unauthorized'}
follower_query = select(Author).filter(Author.user == user_id) follower_query = select(Author).filter(Author.user == user_id)
[follower] = await get_authors_with_stat_cached(follower_query) [follower] = local_session().execute(follower_query)
if not follower: if not follower:
return {'error': 'follower profile is not found'} return {'error': 'follower profile is not found'}
if what == 'AUTHOR': if what == 'AUTHOR':
error = author_unfollow(follower.id, slug) error = author_unfollow(follower.id, slug)
# NOTE: after triggers should update cached stats
if not error: if not error:
logger.info(f'@{follower.slug} unfollowing @{slug}') logger.info(f'@{follower.slug} unfollowed @{slug}')
[author] = await get_authors_with_stat_cached( author_query = select(Author).where(Author.slug == slug)
select(Author).where(Author.slug == slug) [author] = local_session().execute(author_query)
)
if not author:
return {'error': 'cant find author'}
_followers = await update_followers_for_author(follower, author, False)
await notify_follower(follower.dict(), author.id, 'unfollow') await notify_follower(follower.dict(), author.id, 'unfollow')
follows = await update_follows_for_author(
follower, 'author', author.dict(), False
)
elif what == 'TOPIC': elif what == 'TOPIC':
error = topic_unfollow(follower.id, slug) error = topic_unfollow(follower.id, slug)
if not error:
logger.info(f'@{follower.slug} unfollowing §{slug}')
[topic] = await get_topics_with_stat_cached(
select(Topic).where(Topic.slug == slug)
)
if not topic:
return {'error': 'cant find topic'}
follows = await update_follows_for_author(
follower, 'topic', topic.dict(), False
)
elif what == 'COMMUNITY': elif what == 'COMMUNITY':
follows = local_session().execute(select(Community)) follows = local_session().execute(select(Community))
elif what == 'SHOUT': elif what == 'SHOUT':
error = reactions_unfollow(follower.id, slug) error = reactions_unfollow(follower.id, slug)
if not error:
logger.info(f'@{follower.slug} unfollowing §{slug}')
[shout] = local_session().execute(select(Shout).where(Shout.slug == slug))
if not shout:
return {'error': 'cant find shout'}
if not error:
follows = await update_follows_for_author(
follower, 'shout', shout.dict(), False
)
return {'error': error, f'{what.lower()}s': follows} entity = what.lower()
follows_str = await redis.execute('GET', f'author:{follower.id}:follows-{entity}s')
if follows_str:
follows = json.loads(follows_str)
return {'error': error, f'{entity}s': follows}
async def get_follows_by_user_id(user_id: str): async def get_follows_by_user_id(user_id: str):
@ -321,7 +280,7 @@ async def get_topic_followers(_, _info, slug: str, topic_id: int) -> List[Author
.join(Topic, Topic.id == TopicFollower.topic) .join(Topic, Topic.id == TopicFollower.topic)
.filter(or_(Topic.slug == slug, Topic.id == topic_id)) .filter(or_(Topic.slug == slug, Topic.id == topic_id))
) )
return await get_authors_with_stat_cached(q) return get_with_stat(q)
@query.field('get_shout_followers') @query.field('get_shout_followers')

View File

@ -1,5 +1,3 @@
import json
from sqlalchemy import and_, distinct, func, join, select from sqlalchemy import and_, distinct, func, join, select
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
@ -7,10 +5,7 @@ from orm.author import Author, AuthorFollower
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from resolvers.rating import add_author_rating_columns
from services.db import local_session from services.db import local_session
from services.logger import root_logger as logger
from services.rediscache import redis
def add_topic_stat_columns(q): def add_topic_stat_columns(q):
@ -65,7 +60,7 @@ def add_topic_stat_columns(q):
return q return q
def add_author_stat_columns(q, with_rating=False): def add_author_stat_columns(q):
aliased_shout_author = aliased(ShoutAuthor) aliased_shout_author = aliased(ShoutAuthor)
aliased_authors = aliased(AuthorFollower) aliased_authors = aliased(AuthorFollower)
aliased_followers = aliased(AuthorFollower) aliased_followers = aliased(AuthorFollower)
@ -106,20 +101,17 @@ def add_author_stat_columns(q, with_rating=False):
q = q.add_columns(sub_comments.c.comments_count) q = q.add_columns(sub_comments.c.comments_count)
group_list = [Author.id, sub_comments.c.comments_count] group_list = [Author.id, sub_comments.c.comments_count]
if with_rating:
q, group_list = add_author_rating_columns(q, group_list)
q = q.group_by(*group_list) q = q.group_by(*group_list)
return q return q
def get_with_stat(q, with_rating=False): def get_with_stat(q):
try: try:
is_author = f'{q}'.lower().startswith('select author') is_author = f'{q}'.lower().startswith('select author')
is_topic = f'{q}'.lower().startswith('select topic') is_topic = f'{q}'.lower().startswith('select topic')
if is_author: if is_author:
q = add_author_stat_columns(q, with_rating) q = add_author_stat_columns(q)
elif is_topic: elif is_topic:
q = add_topic_stat_columns(q) q = add_topic_stat_columns(q)
records = [] records = []
@ -133,11 +125,6 @@ def get_with_stat(q, with_rating=False):
stat['followers'] = cols[3] stat['followers'] = cols[3]
if is_author: if is_author:
stat['comments'] = cols[4] stat['comments'] = cols[4]
if with_rating:
logger.debug(cols)
stat['rating'] = cols[6]
stat['rating_shouts'] = cols[7]
stat['rating_comments'] = cols[8]
entity.stat = stat entity.stat = stat
records.append(entity) records.append(entity)
except Exception as exc: except Exception as exc:
@ -148,41 +135,6 @@ def get_with_stat(q, with_rating=False):
return records return records
async def get_authors_with_stat_cached(q):
# logger.debug(q)
try:
records = []
with local_session() as session:
for [x] in session.execute(q):
stat_str = await redis.execute('GET', f'author:{x.id}')
x.stat = (
json.loads(stat_str).get('stat')
if isinstance(stat_str, str)
else {}
)
records.append(x)
except Exception as exc:
raise Exception(exc)
return records
async def get_topics_with_stat_cached(q):
try:
records = []
current = None
with local_session() as session:
for [x] in session.execute(q):
current = x
stat_str = await redis.execute('GET', f'topic:{x.id}')
if isinstance(stat_str, str):
x.stat = json.loads(stat_str).get('stat')
records.append(x)
except Exception as exc:
logger.error(current)
raise Exception(exc)
return records
def author_follows_authors(author_id: int): def author_follows_authors(author_id: int):
af = aliased(AuthorFollower, name='af') af = aliased(AuthorFollower, name='af')
q = ( q = (

View File

@ -1,12 +1,9 @@
import asyncio
import json import json
from sqlalchemy import event, select from sqlalchemy import select
from orm.author import Author, AuthorFollower from orm.author import Author
from orm.reaction import Reaction from orm.topic import Topic
from orm.shout import Shout, ShoutAuthor
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat from resolvers.stat import get_with_stat
from services.encoders import CustomJSONEncoder from services.encoders import CustomJSONEncoder
from services.logger import root_logger as logger from services.logger import root_logger as logger
@ -19,7 +16,7 @@ DEFAULT_FOLLOWS = {
} }
async def set_author_cache(author: dict): async def cache_author(author: dict):
payload = json.dumps(author, cls=CustomJSONEncoder) payload = json.dumps(author, cls=CustomJSONEncoder)
await redis.execute('SET', f'user:{author.get("user")}', payload) await redis.execute('SET', f'user:{author.get("user")}', payload)
await redis.execute('SET', f'author:{author.get("id")}', payload) await redis.execute('SET', f'author:{author.get("id")}', payload)
@ -66,65 +63,9 @@ async def set_author_cache(author: dict):
# author not found in the list, so add the new author with the updated stat field # author not found in the list, so add the new author with the updated stat field
followed_author_followers.append(author) followed_author_followers.append(author)
async def update_author_followers_cache(author_id: int, followers):
updated_followers = [f.dict() if isinstance(f, Author) else f for f in followers]
payload = json.dumps(
updated_followers,
cls=CustomJSONEncoder,
)
await redis.execute('SET', f'author:{author_id}:followers', payload)
author_str = await redis.execute('GET', f'author:{author_id}')
if author_str:
author = json.loads(author_str)
author['stat']['followers'] = len(updated_followers)
await set_author_cache(author)
async def cache_follows(follower: Author, entity_type: str, entity, is_insert=True):
async def set_topic_cache(topic: dict): # prepare
payload = json.dumps(topic, cls=CustomJSONEncoder)
await redis.execute('SET', f'topic:{topic.get("id")}', payload)
async def set_follows_topics_cache(follows, author_id: int):
try:
payload = json.dumps(
[a.dict() if isinstance(a, Author) else a for a in follows],
cls=CustomJSONEncoder,
)
await redis.execute('SET', f'author:{author_id}:follows-topics', payload)
except Exception as exc:
logger.error(exc)
import traceback
exc = traceback.format_exc()
logger.error(exc)
async def set_follows_authors_cache(follows, author_id: int):
updated_follows = [a.dict() if isinstance(a, Author) else a for a in follows]
try:
payload = json.dumps(
updated_follows,
cls=CustomJSONEncoder,
)
await redis.execute('SET', f'author:{author_id}:follows-authors', payload)
# update author everywhere
author_str = await redis.execute('GET', f'author:{author_id}')
if author_str:
author = json.loads(author_str)
author['stat']['authors'] = len(updated_follows)
await set_author_cache(author)
except Exception as exc:
import traceback
logger.error(exc)
exc = traceback.format_exc()
logger.error(exc)
async def update_follows_for_author(
follower: Author, entity_type: str, entity: dict, is_insert: bool
):
follows = [] follows = []
redis_key = f'author:{follower.id}:follows-{entity_type}s' redis_key = f'author:{follower.id}:follows-{entity_type}s'
follows_str = await redis.execute('GET', redis_key) follows_str = await redis.execute('GET', redis_key)
@ -138,197 +79,66 @@ async def update_follows_for_author(
raise Exception('wrong entity') raise Exception('wrong entity')
# Remove the entity from follows # Remove the entity from follows
follows = [e for e in follows if e['id'] != entity_id] follows = [e for e in follows if e['id'] != entity_id]
logger.debug(f'{entity['slug']} removed from what @{follower.slug} follows')
if entity_type == 'topic': # update follows cache
await set_follows_topics_cache(follows, follower.id) updated_data = [t.dict() if isinstance(t, Topic) else t for t in follows]
if entity_type == 'author': payload = json.dumps(updated_data, cls=CustomJSONEncoder)
await set_follows_authors_cache(follows, follower.id) await redis.execute('SET', redis_key, payload)
# update follower's stats everywhere
author_str = await redis.execute('GET', f'author:{follower.id}')
if author_str:
author = json.loads(author_str)
author['stat'][f'{entity_type}s'] = len(updated_data)
await cache_author(author)
return follows return follows
async def update_followers_for_author( async def cache_follower(follower: Author, author: Author, is_insert=True):
follower: Author, author: Author, is_insert: bool
):
redis_key = f'author:{author.id}:followers' redis_key = f'author:{author.id}:followers'
followers_str = await redis.execute('GET', redis_key) followers_str = await redis.execute('GET', redis_key)
followers = [] followers = []
if isinstance(followers_str, str): if isinstance(followers_str, str):
followers = json.loads(followers_str) followers = json.loads(followers_str)
if is_insert: if is_insert:
followers.append(follower)
else:
# Remove the entity from followers # Remove the entity from followers
followers = [e for e in followers if e['id'] != author.id] followers = [e for e in followers if e['id'] != author.id]
await update_author_followers_cache(author.id, followers) else:
followers.append(follower)
updated_followers = [f.dict() if isinstance(f, Author) else f for f in followers]
payload = json.dumps(updated_followers, cls=CustomJSONEncoder)
await redis.execute('SET', redis_key, payload)
author_str = await redis.execute('GET', f'author:{follower.id}')
if author_str:
author = json.loads(author_str)
author['stat']['followers'] = len(updated_followers)
await cache_author(author)
return followers return followers
def after_shout_update(_mapper, _connection, shout: Shout): async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool):
logger.info('after shout update')
# Main query to get authors associated with the shout through ShoutAuthor
authors_query = (
select(Author)
.select_from(ShoutAuthor) # Select from ShoutAuthor
.join(Author, Author.id == ShoutAuthor.author) # Join with Author
.filter(ShoutAuthor.shout == shout.id) # Filter by shout.id
)
for author_with_stat in get_with_stat(authors_query):
asyncio.create_task(set_author_cache(author_with_stat.dict()))
def after_reaction_update(mapper, connection, reaction: Reaction):
logger.info('after reaction update')
try:
author_subquery = select(Author).where(Author.id == reaction.created_by)
replied_author_subquery = (
select(Author)
.join(Reaction, Author.id == Reaction.created_by)
.where(Reaction.id == reaction.reply_to)
)
author_query = (
select(author_subquery.subquery())
.select_from(author_subquery.subquery())
.union(
select(replied_author_subquery.subquery()).select_from(
replied_author_subquery.subquery()
)
)
)
for author_with_stat in get_with_stat(author_query):
asyncio.create_task(set_author_cache(author_with_stat.dict()))
shout = connection.execute(
select(Shout).select_from(Shout).where(Shout.id == reaction.shout)
).first()
if shout:
after_shout_update(mapper, connection, shout)
except Exception as exc:
logger.error(exc)
import traceback
traceback.print_exc()
def after_author_update(_mapper, _connection, author: Author):
logger.info('after author update')
q = select(Author).where(Author.id == author.id)
result = get_with_stat(q)
if result:
[author_with_stat] = result
asyncio.create_task(set_author_cache(author_with_stat.dict()))
def after_topic_follower_insert(_mapper, _connection, target: TopicFollower):
logger.info(target)
asyncio.create_task(
handle_topic_follower_change(target.topic, target.follower, True)
)
def after_topic_follower_delete(_mapper, _connection, target: TopicFollower):
logger.info(target)
asyncio.create_task(
handle_topic_follower_change(target.topic, target.follower, False)
)
def after_author_follower_insert(_mapper, _connection, target: AuthorFollower):
logger.info(target)
asyncio.create_task(
handle_author_follower_change(target.author, target.follower, True)
)
def after_author_follower_delete(_mapper, _connection, target: AuthorFollower):
logger.info(target)
asyncio.create_task(
handle_author_follower_change(target.author, target.follower, False)
)
async def handle_author_follower_change(
author_id: int, follower_id: int, is_insert: bool
):
logger.info(author_id) logger.info(author_id)
author_query = select(Author).select_from(Author).filter(Author.id == author_id) author_query = select(Author).select_from(Author).filter(Author.id == author_id)
[author] = get_with_stat(author_query) [author] = get_with_stat(author_query)
follower_query = select(Author).select_from(Author).filter(Author.id == follower_id) follower_query = select(Author).select_from(Author).filter(Author.id == follower_id)
[follower] = get_with_stat(follower_query) [follower] = get_with_stat(follower_query)
if follower and author: if follower and author:
_ = asyncio.create_task(set_author_cache(author.dict())) await cache_author(author.dict())
follows_authors = await redis.execute( await cache_author(follower.dict())
'GET', f'author:{follower_id}:follows-authors' await cache_follows(follower, 'author', author.dict(), is_insert)
) await cache_follower(follower, author, is_insert)
if isinstance(follows_authors, str):
follows_authors = json.loads(follows_authors)
if not any(x.get('id') == author.id for x in follows_authors):
follows_authors.append(author.dict())
_ = asyncio.create_task(set_follows_authors_cache(follows_authors, follower_id))
_ = asyncio.create_task(set_author_cache(follower.dict()))
await update_follows_for_author(
follower,
'author',
{
'id': author.id,
'name': author.name,
'slug': author.slug,
'pic': author.pic,
'bio': author.bio,
'stat': author.stat,
},
is_insert,
)
async def handle_topic_follower_change( async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool):
topic_id: int, follower_id: int, is_insert: bool
):
logger.info(topic_id) logger.info(topic_id)
topic_query = select(Topic).filter(Topic.id == topic_id) topic_query = select(Topic).filter(Topic.id == topic_id)
[topic] = get_with_stat(topic_query) [topic] = get_with_stat(topic_query)
follower_query = select(Author).filter(Author.id == follower_id) follower_query = select(Author).filter(Author.id == follower_id)
[follower] = get_with_stat(follower_query) [follower] = get_with_stat(follower_query)
if follower and topic: if follower and topic:
_ = asyncio.create_task(set_author_cache(follower.dict())) await cache_author(follower.dict())
follows_topics = await redis.execute( await redis.execute('SET', f'topic:{topic.id}', json.dumps(topic.dict(), cls=CustomJSONEncoder))
'GET', f'author:{follower_id}:follows-topics' await cache_follows(follower, 'topic', topic.dict(), is_insert)
)
if isinstance(follows_topics, str):
follows_topics = json.loads(follows_topics)
if not any(x.get('id') == topic.id for x in follows_topics):
follows_topics.append(topic)
_ = asyncio.create_task(set_follows_topics_cache(follows_topics, follower_id))
await update_follows_for_author(
follower,
'topic',
{
'id': topic.id,
'title': topic.title,
'slug': topic.slug,
'body': topic.body,
'stat': topic.stat,
},
is_insert,
)
def events_register(): # handle_author_follow and handle_topic_follow -> cache_author, cache_follows, cache_followers
event.listen(Shout, 'after_insert', after_shout_update)
event.listen(Shout, 'after_update', after_shout_update)
event.listen(Reaction, 'after_insert', after_reaction_update)
event.listen(Reaction, 'after_update', after_reaction_update)
event.listen(Author, 'after_insert', after_author_update)
event.listen(Author, 'after_update', after_author_update)
event.listen(AuthorFollower, 'after_insert', after_author_follower_insert)
event.listen(AuthorFollower, 'after_delete', after_author_follower_delete)
event.listen(TopicFollower, 'after_insert', after_topic_follower_insert)
event.listen(TopicFollower, 'after_delete', after_topic_follower_delete)
logger.info('cache events were registered!')

122
services/triggers.py Normal file
View File

@ -0,0 +1,122 @@
import asyncio
from sqlalchemy import event, select
from orm.author import Author, AuthorFollower
from orm.reaction import Reaction
from orm.shout import Shout, ShoutAuthor
from orm.topic import TopicFollower
from resolvers.stat import get_with_stat
from services.logger import root_logger as logger
from services.cache import cache_author, handle_topic_follower_change, handle_author_follower_change
DEFAULT_FOLLOWS = {
'topics': [],
'authors': [],
'communities': [{'id': 1, 'name': 'Дискурс', 'slug': 'discours', 'pic': ''}],
}
def after_shout_update(_mapper, _connection, shout: Shout):
logger.info('after shout update')
# Main query to get authors associated with the shout through ShoutAuthor
authors_query = (
select(Author)
.select_from(ShoutAuthor) # Select from ShoutAuthor
.join(Author, Author.id == ShoutAuthor.author) # Join with Author
.filter(ShoutAuthor.shout == shout.id) # Filter by shout.id
)
for author_with_stat in get_with_stat(authors_query):
asyncio.create_task(cache_author(author_with_stat.dict()))
def after_reaction_update(mapper, connection, reaction: Reaction):
logger.info('after reaction update')
try:
author_subquery = select(Author).where(Author.id == reaction.created_by)
replied_author_subquery = (
select(Author)
.join(Reaction, Author.id == Reaction.created_by)
.where(Reaction.id == reaction.reply_to)
)
author_query = (
select(author_subquery.subquery())
.select_from(author_subquery.subquery())
.union(
select(replied_author_subquery.subquery()).select_from(
replied_author_subquery.subquery()
)
)
)
for author_with_stat in get_with_stat(author_query):
asyncio.create_task(cache_author(author_with_stat.dict()))
shout = connection.execute(
select(Shout).select_from(Shout).where(Shout.id == reaction.shout)
).first()
if shout:
after_shout_update(mapper, connection, shout)
except Exception as exc:
logger.error(exc)
import traceback
traceback.print_exc()
def after_author_update(_mapper, _connection, author: Author):
logger.info('after author update')
q = select(Author).where(Author.id == author.id)
result = get_with_stat(q)
if result:
[author_with_stat] = result
asyncio.create_task(cache_author(author_with_stat.dict()))
def after_topic_follower_insert(_mapper, _connection, target: TopicFollower):
logger.info(target)
asyncio.create_task(
handle_topic_follower_change(target.topic, target.follower, True)
)
def after_topic_follower_delete(_mapper, _connection, target: TopicFollower):
logger.info(target)
asyncio.create_task(
handle_topic_follower_change(target.topic, target.follower, False)
)
def after_author_follower_insert(_mapper, _connection, target: AuthorFollower):
logger.info(target)
asyncio.create_task(
handle_author_follower_change(target.author, target.follower, True)
)
def after_author_follower_delete(_mapper, _connection, target: AuthorFollower):
logger.info(target)
asyncio.create_task(
handle_author_follower_change(target.author, target.follower, False)
)
def events_register():
event.listen(Shout, 'after_insert', after_shout_update)
event.listen(Shout, 'after_update', after_shout_update)
event.listen(Reaction, 'after_insert', after_reaction_update)
event.listen(Reaction, 'after_update', after_reaction_update)
event.listen(Author, 'after_insert', after_author_update)
event.listen(Author, 'after_update', after_author_update)
event.listen(AuthorFollower, 'after_insert', after_author_follower_insert)
event.listen(AuthorFollower, 'after_delete', after_author_follower_delete)
event.listen(TopicFollower, 'after_insert', after_topic_follower_insert)
event.listen(TopicFollower, 'after_delete', after_topic_follower_delete)
logger.info('cache events were registered!')