This commit is contained in:
parent
60a56fd098
commit
d4982017f6
181
cache/cache.py
vendored
181
cache/cache.py
vendored
|
@ -1,122 +1,58 @@
|
|||
import asyncio
|
||||
import json
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import select, join, and_
|
||||
from orm.author import Author, AuthorFollower
|
||||
from orm.topic import Topic, TopicFollower
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from services.db import local_session
|
||||
from utils.encoders import CustomJSONEncoder
|
||||
from cache.rediscache import redis
|
||||
from services.redis import redis
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
DEFAULT_FOLLOWS = {
|
||||
"topics": [],
|
||||
"authors": [],
|
||||
"shouts": [],
|
||||
"communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}],
|
||||
}
|
||||
|
||||
|
||||
# Кэширование данных темы
|
||||
# Cache topic data
|
||||
async def cache_topic(topic: dict):
|
||||
payload = json.dumps(topic, cls=CustomJSONEncoder)
|
||||
# Одновременное кэширование по id и slug для быстрого доступа
|
||||
# Cache by id and slug for quick access
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"topic:id:{topic['id']}", payload),
|
||||
redis.execute("SET", f"topic:slug:{topic['slug']}", payload),
|
||||
)
|
||||
|
||||
|
||||
# Кэширование данных автора
|
||||
# Cache author data
|
||||
async def cache_author(author: dict):
|
||||
payload = json.dumps(author, cls=CustomJSONEncoder)
|
||||
# Кэширование данных автора по user и id
|
||||
# Cache author data by user and id
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
|
||||
redis.execute("SET", f"author:id:{author['id']}", payload),
|
||||
)
|
||||
|
||||
|
||||
async def get_cached_topic(topic_id: int):
|
||||
"""
|
||||
Получает информацию о теме из кэша или базы данных.
|
||||
|
||||
Args:
|
||||
topic_id (int): Идентификатор темы.
|
||||
|
||||
Returns:
|
||||
dict: Данные темы в формате словаря или None, если тема не найдена.
|
||||
"""
|
||||
# Ключ для кэширования темы в Redis
|
||||
topic_key = f"topic:id:{topic_id}"
|
||||
cached_topic = await redis.get(topic_key)
|
||||
if cached_topic:
|
||||
return json.loads(cached_topic)
|
||||
|
||||
# Если данных о теме нет в кэше, загружаем из базы данных
|
||||
with local_session() as session:
|
||||
topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none()
|
||||
if topic:
|
||||
# Кэшируем полученные данные
|
||||
topic_dict = topic.dict()
|
||||
await redis.set(topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
|
||||
return topic_dict
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def get_cached_shout_authors(shout_id: int):
|
||||
"""
|
||||
Retrieves a list of authors for a given shout from the cache or database if not present.
|
||||
|
||||
Args:
|
||||
shout_id (int): The ID of the shout for which to retrieve authors.
|
||||
|
||||
Returns:
|
||||
List[dict]: A list of dictionaries containing author data.
|
||||
"""
|
||||
# Attempt to retrieve cached author IDs for the shout
|
||||
rkey = f"shout:authors:{shout_id}"
|
||||
cached_author_ids = await redis.get(rkey)
|
||||
if cached_author_ids:
|
||||
author_ids = json.loads(cached_author_ids)
|
||||
else:
|
||||
# If not in cache, fetch from the database and cache the result
|
||||
with local_session() as session:
|
||||
query = (
|
||||
select(ShoutAuthor.author)
|
||||
.where(ShoutAuthor.shout == shout_id)
|
||||
.join(Author, ShoutAuthor.author == Author.id)
|
||||
.filter(Author.deleted_at.is_(None))
|
||||
)
|
||||
author_ids = [author_id for (author_id,) in session.execute(query).all()]
|
||||
await redis.execute("set", rkey, json.dumps(author_ids))
|
||||
|
||||
# Retrieve full author details from cached IDs
|
||||
if author_ids:
|
||||
authors = await get_cached_authors_by_ids(author_ids)
|
||||
logger.debug(f"Shout#{shout_id} authors fetched and cached: {len(authors)} authors found.")
|
||||
return authors
|
||||
|
||||
return []
|
||||
|
||||
|
||||
# Кэширование данных о подписках
|
||||
# Cache follows data
|
||||
async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True):
|
||||
key = f"author:follows-{entity_type}s:{follower_id}"
|
||||
follows_str = await redis.get(key)
|
||||
follows = json.loads(follows_str) if follows_str else []
|
||||
follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type]
|
||||
if is_insert:
|
||||
if entity_id not in follows:
|
||||
follows.append(entity_id)
|
||||
else:
|
||||
follows = [eid for eid in follows if eid != entity_id]
|
||||
await redis.execute("set", key, json.dumps(follows, cls=CustomJSONEncoder))
|
||||
update_follower_stat(follower_id, entity_type, len(follows))
|
||||
await update_follower_stat(follower_id, entity_type, len(follows))
|
||||
|
||||
|
||||
# Обновление статистики подписчика
|
||||
# Update follower statistics
|
||||
async def update_follower_stat(follower_id, entity_type, count):
|
||||
follower_key = f"author:id:{follower_id}"
|
||||
follower_str = await redis.get(follower_key)
|
||||
|
@ -126,13 +62,13 @@ async def update_follower_stat(follower_id, entity_type, count):
|
|||
await cache_author(follower)
|
||||
|
||||
|
||||
# Получение автора из кэша
|
||||
# Get author from cache
|
||||
async def get_cached_author(author_id: int):
|
||||
author_key = f"author:id:{author_id}"
|
||||
result = await redis.get(author_key)
|
||||
if result:
|
||||
return json.loads(result)
|
||||
# Загрузка из базы данных, если не найдено в кэше
|
||||
# Load from database if not found in cache
|
||||
with local_session() as session:
|
||||
author = session.execute(select(Author).where(Author.id == author_id)).scalar_one_or_none()
|
||||
if author:
|
||||
|
@ -141,13 +77,41 @@ async def get_cached_author(author_id: int):
|
|||
return None
|
||||
|
||||
|
||||
# Получение темы по slug из кэша
|
||||
# Function to get cached topic
|
||||
async def get_cached_topic(topic_id: int):
|
||||
"""
|
||||
Fetch topic data from cache or database by id.
|
||||
|
||||
Args:
|
||||
topic_id (int): The identifier for the topic.
|
||||
|
||||
Returns:
|
||||
dict: Topic data or None if not found.
|
||||
"""
|
||||
topic_key = f"topic:id:{topic_id}"
|
||||
cached_topic = await redis.get(topic_key)
|
||||
if cached_topic:
|
||||
return json.loads(cached_topic)
|
||||
|
||||
# If not in cache, fetch from the database
|
||||
with local_session() as session:
|
||||
topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none()
|
||||
if topic:
|
||||
topic_dict = topic.dict()
|
||||
await redis.set(topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder))
|
||||
return topic_dict
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
# Get topic by slug from cache
|
||||
async def get_cached_topic_by_slug(slug: str):
|
||||
topic_key = f"topic:slug:{slug}"
|
||||
result = await redis.get(topic_key)
|
||||
if result:
|
||||
return json.loads(result)
|
||||
# Загрузка из базы данных, если не найдено в кэше
|
||||
# Load from database if not found in cache
|
||||
with local_session() as session:
|
||||
topic = session.execute(select(Topic).where(Topic.slug == slug)).scalar_one_or_none()
|
||||
if topic:
|
||||
|
@ -156,13 +120,13 @@ async def get_cached_topic_by_slug(slug: str):
|
|||
return None
|
||||
|
||||
|
||||
# Получение списка авторов по ID из кэша
|
||||
# Get list of authors by ID from cache
|
||||
async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]:
|
||||
# Одновременное получение данных всех авторов
|
||||
# Fetch all author data concurrently
|
||||
keys = [f"author:id:{author_id}" for author_id in author_ids]
|
||||
results = await asyncio.gather(*(redis.get(key) for key in keys))
|
||||
authors = [json.loads(result) if result else None for result in results]
|
||||
# Загрузка отсутствующих авторов из базы данных и кэширование
|
||||
# Load missing authors from database and cache
|
||||
missing_indices = [index for index, author in enumerate(authors) if author is None]
|
||||
if missing_indices:
|
||||
missing_ids = [author_ids[index] for index in missing_indices]
|
||||
|
@ -170,19 +134,21 @@ async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]:
|
|||
query = select(Author).where(Author.id.in_(missing_ids))
|
||||
missing_authors = session.execute(query).scalars().all()
|
||||
await asyncio.gather(*(cache_author(author.dict()) for author in missing_authors))
|
||||
authors = [author.dict() for author in missing_authors]
|
||||
for index, author in zip(missing_indices, missing_authors):
|
||||
authors[index] = author.dict()
|
||||
return authors
|
||||
|
||||
|
||||
# Get cached topic followers
|
||||
async def get_cached_topic_followers(topic_id: int):
|
||||
# Попытка извлечь кэшированные данные
|
||||
# Attempt to retrieve cached data
|
||||
cached = await redis.get(f"topic:followers:{topic_id}")
|
||||
if cached:
|
||||
followers = json.loads(cached)
|
||||
logger.debug(f"Cached followers for topic#{topic_id}: {len(followers)}")
|
||||
return followers
|
||||
|
||||
# Загрузка из базы данных и кэширование результатов
|
||||
# Load from database and cache results
|
||||
with local_session() as session:
|
||||
followers_ids = [
|
||||
f[0]
|
||||
|
@ -196,8 +162,9 @@ async def get_cached_topic_followers(topic_id: int):
|
|||
return followers
|
||||
|
||||
|
||||
# Get cached author followers
|
||||
async def get_cached_author_followers(author_id: int):
|
||||
# Проверяем кэш на наличие данных
|
||||
# Check cache for data
|
||||
cached = await redis.get(f"author:followers:{author_id}")
|
||||
if cached:
|
||||
followers_ids = json.loads(cached)
|
||||
|
@ -205,7 +172,7 @@ async def get_cached_author_followers(author_id: int):
|
|||
logger.debug(f"Cached followers for author#{author_id}: {len(followers)}")
|
||||
return followers
|
||||
|
||||
# Запрос в базу данных если кэш пуст
|
||||
# Query database if cache is empty
|
||||
with local_session() as session:
|
||||
followers_ids = [
|
||||
f[0]
|
||||
|
@ -219,13 +186,14 @@ async def get_cached_author_followers(author_id: int):
|
|||
return followers
|
||||
|
||||
|
||||
# Get cached follower authors
|
||||
async def get_cached_follower_authors(author_id: int):
|
||||
# Попытка получить авторов из кэша
|
||||
# Attempt to retrieve authors from cache
|
||||
cached = await redis.get(f"author:follows-authors:{author_id}")
|
||||
if cached:
|
||||
authors_ids = json.loads(cached)
|
||||
else:
|
||||
# Запрос авторов из базы данных
|
||||
# Query authors from database
|
||||
with local_session() as session:
|
||||
authors_ids = [
|
||||
a[0]
|
||||
|
@ -241,13 +209,14 @@ async def get_cached_follower_authors(author_id: int):
|
|||
return authors
|
||||
|
||||
|
||||
# Get cached follower topics
|
||||
async def get_cached_follower_topics(author_id: int):
|
||||
# Попытка получить темы из кэша
|
||||
# Attempt to retrieve topics from cache
|
||||
cached = await redis.get(f"author:follows-topics:{author_id}")
|
||||
if cached:
|
||||
topics_ids = json.loads(cached)
|
||||
else:
|
||||
# Загрузка тем из базы данных и их кэширование
|
||||
# Load topics from database and cache them
|
||||
with local_session() as session:
|
||||
topics_ids = [
|
||||
t[0]
|
||||
|
@ -270,30 +239,31 @@ async def get_cached_follower_topics(author_id: int):
|
|||
return topics
|
||||
|
||||
|
||||
# Get author by user ID from cache
|
||||
async def get_cached_author_by_user_id(user_id: str):
|
||||
"""
|
||||
Получает информацию об авторе по его user_id, сначала проверяя кэш, а затем базу данных.
|
||||
Retrieve author information by user_id, checking the cache first, then the database.
|
||||
|
||||
Args:
|
||||
user_id (str): Идентификатор пользователя, по которому нужно получить автора.
|
||||
user_id (str): The user identifier for which to retrieve the author.
|
||||
|
||||
Returns:
|
||||
dict: Словарь с данными автора или None, если автор не найден.
|
||||
dict: Dictionary with author data or None if not found.
|
||||
"""
|
||||
# Пытаемся найти ID автора по user_id в кэше Redis
|
||||
# Attempt to find author ID by user_id in Redis cache
|
||||
author_id = await redis.get(f"author:user:{user_id.strip()}")
|
||||
if author_id:
|
||||
# Если ID найден, получаем полные данные автора по его ID
|
||||
# If ID is found, get full author data by ID
|
||||
author_data = await redis.get(f"author:id:{author_id}")
|
||||
if author_data:
|
||||
return json.loads(author_data)
|
||||
|
||||
# Если данные в кэше не найдены, выполняем запрос к базе данных
|
||||
# If data is not found in cache, query the database
|
||||
with local_session() as session:
|
||||
author = session.execute(select(Author).where(Author.user == user_id)).scalar_one_or_none()
|
||||
|
||||
if author:
|
||||
# Кэшируем полученные данные автора
|
||||
# Cache the retrieved author data
|
||||
author_dict = author.dict()
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)),
|
||||
|
@ -301,27 +271,28 @@ async def get_cached_author_by_user_id(user_id: str):
|
|||
)
|
||||
return author_dict
|
||||
|
||||
# Возвращаем None, если автор не найден
|
||||
# Return None if author is not found
|
||||
return None
|
||||
|
||||
|
||||
# Get cached topic authors
|
||||
async def get_cached_topic_authors(topic_id: int):
|
||||
"""
|
||||
Получает список авторов для заданной темы, используя кэш или базу данных.
|
||||
Retrieve a list of authors for a given topic, using cache or database.
|
||||
|
||||
Args:
|
||||
topic_id (int): Идентификатор темы, для которой нужно получить авторов.
|
||||
topic_id (int): The identifier of the topic for which to retrieve authors.
|
||||
|
||||
Returns:
|
||||
List[dict]: Список словарей, содержащих данные авторов.
|
||||
List[dict]: A list of dictionaries containing author data.
|
||||
"""
|
||||
# Пытаемся получить список ID авторов из кэша
|
||||
# Attempt to get a list of author IDs from cache
|
||||
rkey = f"topic:authors:{topic_id}"
|
||||
cached_authors_ids = await redis.get(rkey)
|
||||
if cached_authors_ids:
|
||||
authors_ids = json.loads(cached_authors_ids)
|
||||
else:
|
||||
# Если кэш пуст, получаем данные из базы данных
|
||||
# If cache is empty, get data from the database
|
||||
with local_session() as session:
|
||||
query = (
|
||||
select(ShoutAuthor.author)
|
||||
|
@ -330,10 +301,10 @@ async def get_cached_topic_authors(topic_id: int):
|
|||
.where(and_(ShoutTopic.topic == topic_id, Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
|
||||
)
|
||||
authors_ids = [author_id for (author_id,) in session.execute(query).all()]
|
||||
# Кэшируем полученные ID авторов
|
||||
# Cache the retrieved author IDs
|
||||
await redis.execute("set", rkey, json.dumps(authors_ids))
|
||||
|
||||
# Получаем полные данные авторов по кэшированным ID
|
||||
# Retrieve full author details from cached IDs
|
||||
if authors_ids:
|
||||
authors = await get_cached_authors_by_ids(authors_ids)
|
||||
logger.debug(f"Topic#{topic_id} authors fetched and cached: {len(authors)} authors found.")
|
||||
|
|
112
cache/precache.py
vendored
112
cache/precache.py
vendored
|
@ -1,67 +1,51 @@
|
|||
import json
|
||||
|
||||
import asyncio
|
||||
from sqlalchemy import and_, join, select
|
||||
|
||||
from orm.author import Author, AuthorFollower
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower
|
||||
from orm.topic import Topic, TopicFollower
|
||||
from resolvers.stat import get_with_stat
|
||||
from cache.cache import cache_author, cache_topic
|
||||
from services.db import local_session
|
||||
from utils.encoders import CustomJSONEncoder
|
||||
from utils.logger import root_logger as logger
|
||||
from cache.rediscache import redis
|
||||
from services.redis import redis
|
||||
|
||||
|
||||
# Предварительное кеширование подписчиков автора
|
||||
async def precache_authors_followers(author_id, session):
|
||||
# Precache author followers
|
||||
authors_followers = set()
|
||||
followers_query = select(AuthorFollower.follower).where(AuthorFollower.author == author_id)
|
||||
result = session.execute(followers_query)
|
||||
authors_followers.update(row[0] for row in result if row[0])
|
||||
|
||||
for row in result:
|
||||
follower_id = row[0]
|
||||
if follower_id:
|
||||
authors_followers.add(follower_id)
|
||||
|
||||
followers_payload = json.dumps(
|
||||
[f for f in authors_followers],
|
||||
cls=CustomJSONEncoder,
|
||||
)
|
||||
followers_payload = json.dumps(list(authors_followers), cls=CustomJSONEncoder)
|
||||
await redis.execute("SET", f"author:followers:{author_id}", followers_payload)
|
||||
|
||||
|
||||
# Предварительное кеширование подписок автора
|
||||
async def precache_authors_follows(author_id, session):
|
||||
# Precache topics followed by author
|
||||
follows_topics = set()
|
||||
follows_topics_query = select(TopicFollower.topic).where(TopicFollower.follower == author_id)
|
||||
result = session.execute(follows_topics_query)
|
||||
|
||||
for row in result:
|
||||
followed_topic_id = row[0]
|
||||
if followed_topic_id:
|
||||
follows_topics.add(followed_topic_id)
|
||||
|
||||
topics_payload = json.dumps([t for t in follows_topics], cls=CustomJSONEncoder)
|
||||
await redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload)
|
||||
|
||||
# Precache authors followed by author
|
||||
follows_authors = set()
|
||||
follows_authors_query = select(AuthorFollower.author).where(AuthorFollower.follower == author_id)
|
||||
result = session.execute(follows_authors_query)
|
||||
follows_shouts_query = select(ShoutReactionsFollower.shout).where(ShoutReactionsFollower.follower == author_id)
|
||||
|
||||
for row in result:
|
||||
followed_author_id = row[0]
|
||||
if followed_author_id:
|
||||
follows_authors.add(followed_author_id)
|
||||
follows_topics = {row[0] for row in session.execute(follows_topics_query) if row[0]}
|
||||
follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]}
|
||||
follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]}
|
||||
|
||||
authors_payload = json.dumps([a for a in follows_authors], cls=CustomJSONEncoder)
|
||||
await redis.execute("SET", f"author:follows-authors:{author_id}", authors_payload)
|
||||
topics_payload = json.dumps(list(follows_topics), cls=CustomJSONEncoder)
|
||||
authors_payload = json.dumps(list(follows_authors), cls=CustomJSONEncoder)
|
||||
shouts_payload = json.dumps(list(follows_shouts), cls=CustomJSONEncoder)
|
||||
|
||||
await asyncio.gather(
|
||||
redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload),
|
||||
redis.execute("SET", f"author:follows-authors:{author_id}", authors_payload),
|
||||
redis.execute("SET", f"author:follows-shouts:{author_id}", shouts_payload),
|
||||
)
|
||||
|
||||
|
||||
# Предварительное кеширование авторов тем
|
||||
async def precache_topics_authors(topic_id: int, session):
|
||||
# Precache topic authors
|
||||
topic_authors = set()
|
||||
topic_authors_query = (
|
||||
select(ShoutAuthor.author)
|
||||
.select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id))
|
||||
|
@ -74,29 +58,18 @@ async def precache_topics_authors(topic_id: int, session):
|
|||
)
|
||||
)
|
||||
)
|
||||
result = session.execute(topic_authors_query)
|
||||
topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]}
|
||||
|
||||
for row in result:
|
||||
author_id = row[0]
|
||||
if author_id:
|
||||
topic_authors.add(author_id)
|
||||
|
||||
authors_payload = json.dumps([a for a in topic_authors], cls=CustomJSONEncoder)
|
||||
authors_payload = json.dumps(list(topic_authors), cls=CustomJSONEncoder)
|
||||
await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload)
|
||||
|
||||
|
||||
# Предварительное кеширование подписчиков тем
|
||||
async def precache_topics_followers(topic_id: int, session):
|
||||
# Precache topic followers
|
||||
topic_followers = set()
|
||||
followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id)
|
||||
result = session.execute(followers_query)
|
||||
topic_followers = {row[0] for row in session.execute(followers_query) if row[0]}
|
||||
|
||||
for row in result:
|
||||
follower_id = row[0]
|
||||
if follower_id:
|
||||
topic_followers.add(follower_id)
|
||||
|
||||
followers_payload = json.dumps([f for f in topic_followers], cls=CustomJSONEncoder)
|
||||
followers_payload = json.dumps(list(topic_followers), cls=CustomJSONEncoder)
|
||||
await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload)
|
||||
|
||||
|
||||
|
@ -106,45 +79,32 @@ async def precache_data():
|
|||
await redis.execute("FLUSHDB")
|
||||
logger.info("redis flushed")
|
||||
|
||||
with local_session() as session:
|
||||
# topics
|
||||
topics_by_id = {}
|
||||
topics = get_with_stat(select(Topic))
|
||||
for topic in topics:
|
||||
topic_profile = topic.dict() if not isinstance(topic, dict) else topic
|
||||
await cache_topic(topic_profile)
|
||||
logger.info(f"{len(topics)} topics precached")
|
||||
|
||||
# followings for topics
|
||||
with local_session() as session:
|
||||
for topic_id in topics_by_id.keys():
|
||||
await precache_topics_followers(topic_id, session)
|
||||
await precache_topics_authors(topic_id, session)
|
||||
logger.info("topics followings precached")
|
||||
await asyncio.gather(
|
||||
precache_topics_followers(topic["id"], session), precache_topics_authors(topic["id"], session)
|
||||
)
|
||||
logger.info(f"{len(topics)} topics and their followings precached")
|
||||
|
||||
# authors
|
||||
authors_by_id = {}
|
||||
authors = get_with_stat(select(Author).where(Author.user.is_not(None)))
|
||||
logger.debug(f"{len(authors)} authors found in database")
|
||||
c = 0
|
||||
for author in authors:
|
||||
if isinstance(author, Author):
|
||||
profile = author.dict()
|
||||
author_id = profile.get("id")
|
||||
user_id = profile.get("user", "").strip()
|
||||
if author_id and user_id:
|
||||
authors_by_id[author_id] = profile
|
||||
await cache_author(profile)
|
||||
c += 1
|
||||
await asyncio.gather(
|
||||
precache_authors_followers(author_id, session), precache_authors_follows(author_id, session)
|
||||
)
|
||||
else:
|
||||
logger.error(f"fail caching {author}")
|
||||
|
||||
logger.info(f"{c} authors precached")
|
||||
|
||||
# followings for authors
|
||||
with local_session() as session:
|
||||
for author_id in authors_by_id.keys():
|
||||
await precache_authors_followers(author_id, session)
|
||||
await precache_authors_follows(author_id, session)
|
||||
logger.info("authors followings precached")
|
||||
logger.info(f"{len(authors)} authors and their followings precached")
|
||||
except Exception as exc:
|
||||
logger.error(exc)
|
||||
logger.error(f"Error in precache_data: {exc}")
|
||||
|
|
61
cache/revalidator.py
vendored
61
cache/revalidator.py
vendored
|
@ -1,48 +1,55 @@
|
|||
import asyncio
|
||||
from cache.cache import get_cached_author, cache_author, get_cached_topic, cache_topic
|
||||
from utils.logger import root_logger as logger
|
||||
from cache.cache import get_cached_author, cache_author, cache_topic, get_cached_topic
|
||||
|
||||
|
||||
class CacheRevalidationManager:
|
||||
"""Управление периодической ревалидацией кэша."""
|
||||
def __init__(self, interval=60):
|
||||
"""Инициализация менеджера с заданным интервалом проверки (в секундах)."""
|
||||
self.interval = interval
|
||||
self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()}
|
||||
self.lock = asyncio.Lock()
|
||||
self.running = True
|
||||
|
||||
def __init__(self):
|
||||
self.items_to_revalidate = {"authors": set(), "topics": set()}
|
||||
self.revalidation_interval = 60 # Интервал ревалидации в секундах
|
||||
self.loop = None
|
||||
|
||||
def start(self):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.loop.run_until_complete(self.revalidate_cache())
|
||||
self.loop.run_forever()
|
||||
logger.info("[services.revalidator] started infinite loop")
|
||||
async def start(self):
|
||||
"""Запуск фонового воркера для ревалидации кэша."""
|
||||
asyncio.create_task(self.revalidate_cache())
|
||||
|
||||
async def revalidate_cache(self):
|
||||
"""Периодическая ревалидация кэша."""
|
||||
while True:
|
||||
await asyncio.sleep(self.revalidation_interval)
|
||||
"""Циклическая проверка и ревалидация кэша каждые self.interval секунд."""
|
||||
try:
|
||||
while self.running:
|
||||
await asyncio.sleep(self.interval)
|
||||
await self.process_revalidation()
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Revalidation worker was stopped.")
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred in the revalidation worker: {e}")
|
||||
|
||||
async def process_revalidation(self):
|
||||
"""Ревалидация кэша для отмеченных сущностей."""
|
||||
for entity_type, ids in self.items_to_revalidate.items():
|
||||
for entity_id in ids:
|
||||
if entity_type == "authors":
|
||||
# Ревалидация кэша автора
|
||||
author = await get_cached_author(entity_id)
|
||||
"""Обновление кэша для всех сущностей, требующих ревалидации."""
|
||||
async with self.lock:
|
||||
# Ревалидация кэша авторов
|
||||
for author_id in self.items_to_revalidate["authors"]:
|
||||
author = await get_cached_author(author_id)
|
||||
if author:
|
||||
await cache_author(author)
|
||||
elif entity_type == "topics":
|
||||
# Ревалидация кэша темы
|
||||
topic = await get_cached_topic(entity_id)
|
||||
self.items_to_revalidate["authors"].clear()
|
||||
|
||||
# Ревалидация кэша тем
|
||||
for topic_id in self.items_to_revalidate["topics"]:
|
||||
topic = await get_cached_topic(topic_id)
|
||||
if topic:
|
||||
await cache_topic(topic)
|
||||
ids.clear()
|
||||
self.items_to_revalidate["topics"].clear()
|
||||
|
||||
def mark_for_revalidation(self, entity_id, entity_type):
|
||||
"""Отметить сущность для ревалидации."""
|
||||
self.items_to_revalidate[entity_type].add(entity_id)
|
||||
|
||||
def stop(self):
|
||||
"""Остановка фонового воркера."""
|
||||
self.running = False
|
||||
|
||||
# Инициализация менеджера ревалидации
|
||||
revalidation_manager = CacheRevalidationManager()
|
||||
|
||||
revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут
|
||||
|
|
111
cache/triggers.py
vendored
111
cache/triggers.py
vendored
|
@ -1,85 +1,68 @@
|
|||
from sqlalchemy import event
|
||||
from orm.author import Author, AuthorFollower
|
||||
from orm.reaction import Reaction
|
||||
from orm.shout import Shout, ShoutAuthor
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutReactionsFollower
|
||||
from orm.topic import Topic, TopicFollower
|
||||
from cache.revalidator import revalidation_manager
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
|
||||
def after_update_handler(mapper, connection, target):
|
||||
"""Обработчик обновления сущности."""
|
||||
entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts"
|
||||
revalidation_manager.mark_for_revalidation(target.id, entity_type)
|
||||
def mark_for_revalidation(entity, *args):
|
||||
"""Отметка сущности для ревалидации."""
|
||||
entity_type = (
|
||||
"authors"
|
||||
if isinstance(entity, Author)
|
||||
else "topics"
|
||||
if isinstance(entity, Topic)
|
||||
else "reactions"
|
||||
if isinstance(entity, Reaction)
|
||||
else "shouts"
|
||||
if isinstance(entity, Shout)
|
||||
else None
|
||||
)
|
||||
if entity_type:
|
||||
revalidation_manager.mark_for_revalidation(entity.id, entity_type)
|
||||
|
||||
|
||||
def after_follower_insert_update_handler(mapper, connection, target):
|
||||
"""Обработчик добавления или обновления подписки."""
|
||||
def after_follower_handler(mapper, connection, target, is_delete=False):
|
||||
"""Обработчик добавления, обновления или удаления подписки."""
|
||||
entity_type = None
|
||||
if isinstance(target, AuthorFollower):
|
||||
# Пометить автора и подписчика для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
|
||||
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
|
||||
entity_type = "authors"
|
||||
elif isinstance(target, TopicFollower):
|
||||
# Пометить тему и подписчика для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.topic_id, "topics")
|
||||
entity_type = "topics"
|
||||
elif isinstance(target, ShoutReactionsFollower):
|
||||
entity_type = "shouts"
|
||||
|
||||
if entity_type:
|
||||
revalidation_manager.mark_for_revalidation(
|
||||
target.author_id if entity_type == "authors" else target.topic_id, entity_type
|
||||
)
|
||||
if not is_delete:
|
||||
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
|
||||
|
||||
|
||||
def after_follower_delete_handler(mapper, connection, target):
|
||||
"""Обработчик удаления подписки."""
|
||||
if isinstance(target, AuthorFollower):
|
||||
# Пометить автора и подписчика для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
|
||||
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
|
||||
elif isinstance(target, TopicFollower):
|
||||
# Пометить тему и подписчика для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.topic_id, "topics")
|
||||
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
|
||||
|
||||
|
||||
def after_reaction_update_handler(mapper, connection, reaction):
|
||||
"""Обработчик изменений реакций."""
|
||||
# Пометить shout для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(reaction.shout_id, "shouts")
|
||||
# Пометить автора реакции для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(reaction.created_by, "authors")
|
||||
|
||||
|
||||
def after_shout_author_insert_update_handler(mapper, connection, target):
|
||||
"""Обработчик добавления или обновления авторства публикации."""
|
||||
# Пометить shout и автора для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.shout_id, "shouts")
|
||||
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
|
||||
|
||||
|
||||
def after_shout_author_delete_handler(mapper, connection, target):
|
||||
"""Обработчик удаления авторства публикации."""
|
||||
# Пометить shout и автора для ревалидации
|
||||
revalidation_manager.mark_for_revalidation(target.shout_id, "shouts")
|
||||
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
|
||||
|
||||
|
||||
def events_register():
|
||||
"""Регистрация обработчиков событий для всех сущностей."""
|
||||
event.listen(ShoutAuthor, "after_insert", after_shout_author_insert_update_handler)
|
||||
event.listen(ShoutAuthor, "after_update", after_shout_author_insert_update_handler)
|
||||
event.listen(ShoutAuthor, "after_delete", after_shout_author_delete_handler)
|
||||
event.listen(ShoutAuthor, "after_insert", mark_for_revalidation)
|
||||
event.listen(ShoutAuthor, "after_update", mark_for_revalidation)
|
||||
event.listen(ShoutAuthor, "after_delete", mark_for_revalidation)
|
||||
|
||||
event.listen(AuthorFollower, "after_insert", after_follower_insert_update_handler)
|
||||
event.listen(AuthorFollower, "after_update", after_follower_insert_update_handler)
|
||||
event.listen(AuthorFollower, "after_delete", after_follower_delete_handler)
|
||||
event.listen(TopicFollower, "after_insert", after_follower_insert_update_handler)
|
||||
event.listen(TopicFollower, "after_update", after_follower_insert_update_handler)
|
||||
event.listen(TopicFollower, "after_delete", after_follower_delete_handler)
|
||||
event.listen(Reaction, "after_update", after_reaction_update_handler)
|
||||
event.listen(AuthorFollower, "after_insert", after_follower_handler)
|
||||
event.listen(AuthorFollower, "after_update", after_follower_handler)
|
||||
event.listen(AuthorFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True))
|
||||
|
||||
event.listen(Author, "after_update", after_update_handler)
|
||||
event.listen(Topic, "after_update", after_update_handler)
|
||||
event.listen(Shout, "after_update", after_update_handler)
|
||||
event.listen(
|
||||
Reaction,
|
||||
"after_update",
|
||||
lambda mapper, connection, target: revalidation_manager.mark_for_revalidation(target.shout, "shouts"),
|
||||
)
|
||||
event.listen(TopicFollower, "after_insert", after_follower_handler)
|
||||
event.listen(TopicFollower, "after_update", after_follower_handler)
|
||||
event.listen(TopicFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True))
|
||||
|
||||
event.listen(ShoutReactionsFollower, "after_insert", after_follower_handler)
|
||||
event.listen(ShoutReactionsFollower, "after_update", after_follower_handler)
|
||||
event.listen(ShoutReactionsFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True))
|
||||
|
||||
event.listen(Reaction, "after_update", mark_for_revalidation)
|
||||
event.listen(Author, "after_update", mark_for_revalidation)
|
||||
event.listen(Topic, "after_update", mark_for_revalidation)
|
||||
event.listen(Shout, "after_update", mark_for_revalidation)
|
||||
|
||||
logger.info("Event handlers registered successfully.")
|
||||
|
|
24
cache/unread.py
vendored
24
cache/unread.py
vendored
|
@ -1,24 +0,0 @@
|
|||
import json
|
||||
|
||||
from cache.rediscache import redis
|
||||
|
||||
|
||||
async def get_unread_counter(chat_id: str, author_id: int) -> int:
|
||||
r = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}")
|
||||
if isinstance(r, str):
|
||||
return int(r)
|
||||
elif isinstance(r, int):
|
||||
return r
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
async def get_total_unread_counter(author_id: int) -> int:
|
||||
chats_set = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")
|
||||
s = 0
|
||||
if isinstance(chats_set, str):
|
||||
chats_set = json.loads(chats_set)
|
||||
if isinstance(chats_set, list):
|
||||
for chat_id in chats_set:
|
||||
s += await get_unread_counter(chat_id, author_id)
|
||||
return s
|
2
main.py
2
main.py
|
@ -14,7 +14,7 @@ from services.sentry import start_sentry
|
|||
from services.viewed import ViewedStorage
|
||||
from services.webhook import WebhookEndpoint
|
||||
from cache.precache import precache_data
|
||||
from cache.rediscache import redis
|
||||
from services.redis import redis
|
||||
from cache.revalidator import revalidation_manager
|
||||
from settings import DEV_SERVER_PID_FILE_NAME, MODE
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ import json
|
|||
from orm.notification import Notification
|
||||
from services.db import local_session
|
||||
from utils.logger import root_logger as logger
|
||||
from cache.rediscache import redis
|
||||
from services.redis import redis
|
||||
|
||||
|
||||
def save_notification(action: str, entity: str, payload):
|
||||
|
|
|
@ -9,7 +9,7 @@ logger = logging.getLogger("redis")
|
|||
logger.setLevel(logging.WARNING)
|
||||
|
||||
|
||||
class RedisCache:
|
||||
class RedisService:
|
||||
def __init__(self, uri=REDIS_URL):
|
||||
self._uri: str = uri
|
||||
self.pubsub_channels = []
|
||||
|
@ -58,6 +58,6 @@ class RedisCache:
|
|||
await self._client.publish(channel, data)
|
||||
|
||||
|
||||
redis = RedisCache()
|
||||
redis = RedisService()
|
||||
|
||||
__all__ = ["redis"]
|
|
@ -6,7 +6,7 @@ import os
|
|||
from opensearchpy import OpenSearch
|
||||
|
||||
from utils.encoders import CustomJSONEncoder
|
||||
from cache.rediscache import redis
|
||||
from services.redis import redis
|
||||
|
||||
# Set redis logging level to suppress DEBUG messages
|
||||
logger = logging.getLogger("search")
|
||||
|
|
|
@ -13,7 +13,7 @@ from orm.author import Author
|
|||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.topic import Topic
|
||||
from services.db import local_session
|
||||
from services.logger import root_logger as logger
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH") or "/dump/google-service.json"
|
||||
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID")
|
||||
|
|
Loading…
Reference in New Issue
Block a user