From 615f1fe4682c0ca0cb9d456a7910b4b29f0bb15b Mon Sep 17 00:00:00 2001 From: Untone Date: Sat, 22 Mar 2025 11:47:19 +0300 Subject: [PATCH] topics+authors-reimplemented-cache --- CHANGELOG.md | 74 ++++++--- cache/cache.py | 240 ++++++++++++++++++++------- cache/memorycache.py | 181 -------------------- cache/precache.py | 15 +- cache/revalidator.py | 122 ++++++++++++-- docs/caching.md | 279 +++++++++++++++++++++++++++++++ docs/features.md | 10 +- orm/author.py | 84 +++++++++- resolvers/author.py | 381 ++++++++++++++++++++++++++++++------------- resolvers/topic.py | 260 +++++++++++++++-------------- services/db.py | 2 +- 11 files changed, 1127 insertions(+), 521 deletions(-) delete mode 100644 cache/memorycache.py create mode 100644 docs/caching.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 33096bdf..987e2303 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,57 @@ +#### [0.4.15] - 2025-03-22 +- Upgraded caching system described `docs/caching.md` +- Module `cache/memorycache.py` removed +- Enhanced caching system with backward compatibility: + - Unified cache key generation with support for existing naming patterns + - Improved Redis operation function with better error handling + - Updated precache module to use consistent Redis interface + - Integrated revalidator with the invalidation system for better performance + - Added comprehensive documentation for the caching system + - Enhanced cached_query to support template-based cache keys + - Standardized error handling across all cache operations +- Optimized cache invalidation system: + - Added targeted invalidation for individual entities (authors, topics) + - Improved revalidation manager with individual object processing + - Implemented batched processing for high-volume invalidations + - Reduced Redis operations by using precise key invalidation instead of prefix-based wipes + - Added special handling for slug changes in topics +- Unified caching system for all models: + - Implemented abstract functions `cache_data`, `get_cached_data` and `invalidate_cache_by_prefix` + - Added `cached_query` function for unified approach to query caching + - Updated resolvers `author.py` and `topic.py` to use the new caching API + - Improved logging for cache operations to simplify debugging + - Optimized Redis memory usage through key format unification +- Improved caching and sorting in Topic and Author modules: + - Added support for dictionary sorting parameters in `by` for both modules + - Optimized cache key generation for stable behavior with various parameters + - Enhanced sorting logic with direction support and arbitrary fields + - Added `by` parameter support in the API for getting topics by community +- Performance optimizations for author-related queries: + - Added SQLAlchemy-managed indexes to `Author`, `AuthorFollower`, `AuthorRating` and `AuthorBookmark` models + - Implemented persistent Redis caching for author queries without TTL (invalidated only on changes) + - Optimized author retrieval with separate endpoints: + - `get_authors_all` - returns all non-deleted authors without statistics + - `get_authors_paginated` - returns authors with statistics and pagination support + - `load_authors_by` - optimized to use caching and efficient sorting + - Improved SQL queries with optimized JOIN conditions and efficient filtering + - Added pre-aggregation of statistics (shouts count, followers count) in single efficient queries + - Implemented robust cache invalidation on author updates + - Created necessary indexes for author lookups by user ID, slug, and timestamps + +#### [0.4.14] - 2025-03-21 +- Significant performance improvements for topic queries: + - Added database indexes to optimize JOIN operations + - Implemented persistent Redis caching for topic queries (no TTL, invalidated only on changes) + - Optimized topic retrieval with separate endpoints for different use cases: + - `get_topics_all` - returns all topics without statistics for lightweight listing + - `get_topics_paginated` - returns topics with statistics and pagination support + - `get_topics_by_community` - adds pagination and optimized filtering by community + - Added SQLAlchemy-managed indexes directly in ORM models for automatic schema maintenance + - Created `sync_indexes()` function for automatic index synchronization during app startup + - Reduced database load by pre-aggregating statistics in optimized SQL queries + - Added robust cache invalidation on topic create/update/delete operations + - Improved query optimization with proper JOIN conditions and specific partial indexes + #### [0.4.13] - 2025-03-20 - Fixed Topic objects serialization error in cache/memorycache.py - Improved CustomJSONEncoder to support SQLAlchemy models with dict() method @@ -244,22 +298,4 @@ #### [0.2.7] -- `loadFollowedReactions` now with `login_required` -- notifier service api draft -- added `shout` visibility kind in schema -- community isolated from author in orm - - -#### [0.2.6] -- redis connection pool -- auth context fixes -- communities orm, resolvers, schema - - -#### [0.2.5] -- restructured -- all users have their profiles as authors in core -- `gittask`, `inbox` and `auth` logics removed -- `settings` moved to base and now smaller -- new outside auth schema -- removed `gittask`, `auth`, `inbox`, `migration` +- `loadFollowedReactions` now with ` \ No newline at end of file diff --git a/cache/cache.py b/cache/cache.py index caeecfa5..5b8ea5d6 100644 --- a/cache/cache.py +++ b/cache/cache.py @@ -1,6 +1,35 @@ +""" +Caching system for the Discours platform +---------------------------------------- + +This module provides a comprehensive caching solution with these key components: + +1. KEY NAMING CONVENTIONS: + - Entity-based keys: "entity:property:value" (e.g., "author:id:123") + - Collection keys: "entity:collection:params" (e.g., "authors:stats:limit=10:offset=0") + - Special case keys: Maintained for backwards compatibility (e.g., "topic_shouts_123") + +2. CORE FUNCTIONS: + - cached_query(): High-level function for retrieving cached data or executing queries + +3. ENTITY-SPECIFIC FUNCTIONS: + - cache_author(), cache_topic(): Cache entity data + - get_cached_author(), get_cached_topic(): Retrieve entity data from cache + - invalidate_cache_by_prefix(): Invalidate all keys with a specific prefix + +4. CACHE INVALIDATION STRATEGY: + - Direct invalidation via invalidate_* functions for immediate changes + - Delayed invalidation via revalidation_manager for background processing + - Event-based triggers for automatic cache updates (see triggers.py) + +To maintain consistency with the existing codebase, this module preserves +the original key naming patterns while providing a more structured approach +for new cache operations. +""" + import asyncio import json -from typing import List +from typing import Any, Dict, List, Optional, Union import orjson from sqlalchemy import and_, join, select @@ -20,8 +49,10 @@ DEFAULT_FOLLOWS = { "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], } -CACHE_TTL = 300 # 5 минут +CACHE_TTL = 300 # 5 minutes +# Key templates for common entity types +# These are used throughout the codebase and should be maintained for compatibility CACHE_KEYS = { "TOPIC_ID": "topic:id:{}", "TOPIC_SLUG": "topic:slug:{}", @@ -38,8 +69,8 @@ CACHE_KEYS = { async def cache_topic(topic: dict): payload = json.dumps(topic, cls=CustomJSONEncoder) await asyncio.gather( - redis_operation("SET", f"topic:id:{topic['id']}", payload), - redis_operation("SET", f"topic:slug:{topic['slug']}", payload), + redis.execute("SET", f"topic:id:{topic['id']}", payload), + redis.execute("SET", f"topic:slug:{topic['slug']}", payload), ) @@ -47,29 +78,29 @@ async def cache_topic(topic: dict): async def cache_author(author: dict): payload = json.dumps(author, cls=CustomJSONEncoder) await asyncio.gather( - redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])), - redis_operation("SET", f"author:id:{author['id']}", payload), + redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])), + redis.execute("SET", f"author:id:{author['id']}", payload), ) # Cache follows data async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): key = f"author:follows-{entity_type}s:{follower_id}" - follows_str = await redis_operation("GET", key) + follows_str = await redis.execute("GET", key) follows = orjson.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] if is_insert: if entity_id not in follows: follows.append(entity_id) else: follows = [eid for eid in follows if eid != entity_id] - await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) + await redis.execute("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) await update_follower_stat(follower_id, entity_type, len(follows)) # Update follower statistics async def update_follower_stat(follower_id, entity_type, count): follower_key = f"author:id:{follower_id}" - follower_str = await redis_operation("GET", follower_key) + follower_str = await redis.execute("GET", follower_key) follower = orjson.loads(follower_str) if follower_str else None if follower: follower["stat"] = {f"{entity_type}s": count} @@ -79,7 +110,7 @@ async def update_follower_stat(follower_id, entity_type, count): # Get author from cache async def get_cached_author(author_id: int, get_with_stat): author_key = f"author:id:{author_id}" - result = await redis_operation("GET", author_key) + result = await redis.execute("GET", author_key) if result: return orjson.loads(result) # Load from database if not found in cache @@ -104,7 +135,7 @@ async def get_cached_topic(topic_id: int): dict: Topic data or None if not found. """ topic_key = f"topic:id:{topic_id}" - cached_topic = await redis_operation("GET", topic_key) + cached_topic = await redis.execute("GET", topic_key) if cached_topic: return orjson.loads(cached_topic) @@ -113,7 +144,7 @@ async def get_cached_topic(topic_id: int): topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() if topic: topic_dict = topic.dict() - await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) + await redis.execute("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) return topic_dict return None @@ -122,7 +153,7 @@ async def get_cached_topic(topic_id: int): # Get topic by slug from cache async def get_cached_topic_by_slug(slug: str, get_with_stat): topic_key = f"topic:slug:{slug}" - result = await redis_operation("GET", topic_key) + result = await redis.execute("GET", topic_key) if result: return orjson.loads(result) # Load from database if not found in cache @@ -139,7 +170,7 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat): async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: # Fetch all author data concurrently keys = [f"author:id:{author_id}" for author_id in author_ids] - results = await asyncio.gather(*(redis_operation("GET", key) for key in keys)) + results = await asyncio.gather(*(redis.execute("GET", key) for key in keys)) authors = [orjson.loads(result) if result else None for result in results] # Load missing authors from database and cache missing_indices = [index for index, author in enumerate(authors) if author is None] @@ -166,7 +197,7 @@ async def get_cached_topic_followers(topic_id: int): """ try: cache_key = CACHE_KEYS["TOPIC_FOLLOWERS"].format(topic_id) - cached = await redis_operation("GET", cache_key) + cached = await redis.execute("GET", cache_key) if cached: followers_ids = orjson.loads(cached) @@ -182,7 +213,7 @@ async def get_cached_topic_followers(topic_id: int): .all() ] - await redis_operation("SETEX", cache_key, value=orjson.dumps(followers_ids), ttl=CACHE_TTL) + await redis.execute("SETEX", cache_key, CACHE_TTL, orjson.dumps(followers_ids)) followers = await get_cached_authors_by_ids(followers_ids) logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}") return followers @@ -195,7 +226,7 @@ async def get_cached_topic_followers(topic_id: int): # Get cached author followers async def get_cached_author_followers(author_id: int): # Check cache for data - cached = await redis_operation("GET", f"author:followers:{author_id}") + cached = await redis.execute("GET", f"author:followers:{author_id}") if cached: followers_ids = orjson.loads(cached) followers = await get_cached_authors_by_ids(followers_ids) @@ -211,7 +242,7 @@ async def get_cached_author_followers(author_id: int): .filter(AuthorFollower.author == author_id, Author.id != author_id) .all() ] - await redis_operation("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids)) + await redis.execute("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids)) followers = await get_cached_authors_by_ids(followers_ids) return followers @@ -219,7 +250,7 @@ async def get_cached_author_followers(author_id: int): # Get cached follower authors async def get_cached_follower_authors(author_id: int): # Attempt to retrieve authors from cache - cached = await redis_operation("GET", f"author:follows-authors:{author_id}") + cached = await redis.execute("GET", f"author:follows-authors:{author_id}") if cached: authors_ids = orjson.loads(cached) else: @@ -233,7 +264,7 @@ async def get_cached_follower_authors(author_id: int): .where(AuthorFollower.follower == author_id) ).all() ] - await redis_operation("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids)) + await redis.execute("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids)) authors = await get_cached_authors_by_ids(authors_ids) return authors @@ -242,7 +273,7 @@ async def get_cached_follower_authors(author_id: int): # Get cached follower topics async def get_cached_follower_topics(author_id: int): # Attempt to retrieve topics from cache - cached = await redis_operation("GET", f"author:follows-topics:{author_id}") + cached = await redis.execute("GET", f"author:follows-topics:{author_id}") if cached: topics_ids = orjson.loads(cached) else: @@ -255,11 +286,11 @@ async def get_cached_follower_topics(author_id: int): .where(TopicFollower.follower == author_id) .all() ] - await redis_operation("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids)) + await redis.execute("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids)) topics = [] for topic_id in topics_ids: - topic_str = await redis_operation("GET", f"topic:id:{topic_id}") + topic_str = await redis.execute("GET", f"topic:id:{topic_id}") if topic_str: topic = orjson.loads(topic_str) if topic and topic not in topics: @@ -281,10 +312,10 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): dict: Dictionary with author data or None if not found. """ # Attempt to find author ID by user_id in Redis cache - author_id = await redis_operation("GET", f"author:user:{user_id.strip()}") + author_id = await redis.execute("GET", f"author:user:{user_id.strip()}") if author_id: # If ID is found, get full author data by ID - author_data = await redis_operation("GET", f"author:id:{author_id}") + author_data = await redis.execute("GET", f"author:id:{author_id}") if author_data: return orjson.loads(author_data) @@ -296,8 +327,8 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): author = authors[0] author_dict = author.dict() await asyncio.gather( - redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)), - redis_operation("SET", f"author:id:{author.id}", orjson.dumps(author_dict)), + redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)), + redis.execute("SET", f"author:id:{author.id}", orjson.dumps(author_dict)), ) return author_dict @@ -318,7 +349,7 @@ async def get_cached_topic_authors(topic_id: int): """ # Attempt to get a list of author IDs from cache rkey = f"topic:authors:{topic_id}" - cached_authors_ids = await redis_operation("GET", rkey) + cached_authors_ids = await redis.execute("GET", rkey) if cached_authors_ids: authors_ids = orjson.loads(cached_authors_ids) else: @@ -332,7 +363,7 @@ async def get_cached_topic_authors(topic_id: int): ) authors_ids = [author_id for (author_id,) in session.execute(query).all()] # Cache the retrieved author IDs - await redis_operation("SET", rkey, orjson.dumps(authors_ids)) + await redis.execute("SET", rkey, orjson.dumps(authors_ids)) # Retrieve full author details from cached IDs if authors_ids: @@ -353,11 +384,11 @@ async def invalidate_shouts_cache(cache_keys: List[str]): cache_key = f"shouts:{key}" # Удаляем основной кэш - await redis_operation("DEL", cache_key) + await redis.execute("DEL", cache_key) logger.debug(f"Invalidated cache key: {cache_key}") # Добавляем ключ в список инвалидированных с TTL - await redis_operation("SETEX", f"{cache_key}:invalidated", value="1", ttl=CACHE_TTL) + await redis.execute("SETEX", f"{cache_key}:invalidated", CACHE_TTL, "1") # Если это кэш темы, инвалидируем также связанные ключи if key.startswith("topic_"): @@ -369,7 +400,7 @@ async def invalidate_shouts_cache(cache_keys: List[str]): f"topic:stats:{topic_id}", ] for related_key in related_keys: - await redis_operation("DEL", related_key) + await redis.execute("DEL", related_key) logger.debug(f"Invalidated related key: {related_key}") except Exception as e: @@ -380,13 +411,13 @@ async def cache_topic_shouts(topic_id: int, shouts: List[dict]): """Кэширует список публикаций для темы""" key = f"topic_shouts_{topic_id}" payload = json.dumps(shouts, cls=CustomJSONEncoder) - await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL) + await redis.execute("SETEX", key, CACHE_TTL, payload) async def get_cached_topic_shouts(topic_id: int) -> List[dict]: """Получает кэшированный список публикаций для темы""" key = f"topic_shouts_{topic_id}" - cached = await redis_operation("GET", key) + cached = await redis.execute("GET", key) if cached: return orjson.loads(cached) return None @@ -432,27 +463,7 @@ async def invalidate_shout_related_cache(shout: Shout, author_id: int): await invalidate_shouts_cache(list(cache_keys)) -async def redis_operation(operation: str, key: str, value=None, ttl=None): - """ - Унифицированная функция для работы с Redis - - Args: - operation: 'GET', 'SET', 'DEL', 'SETEX' - key: ключ - value: значение (для SET/SETEX) - ttl: время жизни в секундах (для SETEX) - """ - try: - if operation == "GET": - return await redis.execute("GET", key) - elif operation == "SET": - await redis.execute("SET", key, value) - elif operation == "SETEX": - await redis.execute("SETEX", key, ttl or CACHE_TTL, value) - elif operation == "DEL": - await redis.execute("DEL", key) - except Exception as e: - logger.error(f"Redis {operation} error for key {key}: {e}") +# Function removed - direct Redis calls used throughout the module instead async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_method): @@ -466,7 +477,7 @@ async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_ cache_method: метод кэширования """ key = f"{entity_type}:id:{entity_id}" - cached = await redis_operation("GET", key) + cached = await redis.execute("GET", key) if cached: return orjson.loads(cached) @@ -497,3 +508,120 @@ async def cache_by_id(entity, entity_id: int, cache_method): d = x.dict() await cache_method(d) return d + + +# Универсальная функция для сохранения данных в кеш +async def cache_data(key: str, data: Any, ttl: Optional[int] = None) -> None: + """ + Сохраняет данные в кеш по указанному ключу. + + Args: + key: Ключ кеша + data: Данные для сохранения + ttl: Время жизни кеша в секундах (None - бессрочно) + """ + try: + payload = json.dumps(data, cls=CustomJSONEncoder) + if ttl: + await redis.execute("SETEX", key, ttl, payload) + else: + await redis.execute("SET", key, payload) + logger.debug(f"Данные сохранены в кеш по ключу {key}") + except Exception as e: + logger.error(f"Ошибка при сохранении данных в кеш: {e}") + + +# Универсальная функция для получения данных из кеша +async def get_cached_data(key: str) -> Optional[Any]: + """ + Получает данные из кеша по указанному ключу. + + Args: + key: Ключ кеша + + Returns: + Any: Данные из кеша или None, если данных нет + """ + try: + cached_data = await redis.execute("GET", key) + if cached_data: + logger.debug(f"Данные получены из кеша по ключу {key}") + return orjson.loads(cached_data) + return None + except Exception as e: + logger.error(f"Ошибка при получении данных из кеша: {e}") + return None + + +# Универсальная функция для инвалидации кеша по префиксу +async def invalidate_cache_by_prefix(prefix: str) -> None: + """ + Инвалидирует все ключи кеша с указанным префиксом. + + Args: + prefix: Префикс ключей кеша для инвалидации + """ + try: + keys = await redis.execute("KEYS", f"{prefix}:*") + if keys: + await redis.execute("DEL", *keys) + logger.debug(f"Удалено {len(keys)} ключей кеша с префиксом {prefix}") + except Exception as e: + logger.error(f"Ошибка при инвалидации кеша: {e}") + + +# Универсальная функция для получения и кеширования данных +async def cached_query( + cache_key: str, + query_func: callable, + ttl: Optional[int] = None, + force_refresh: bool = False, + use_key_format: bool = True, + **query_params, +) -> Any: + """ + Gets data from cache or executes query and saves result to cache. + Supports existing key formats for compatibility. + + Args: + cache_key: Cache key or key template from CACHE_KEYS + query_func: Function to execute the query + ttl: Cache TTL in seconds (None - indefinite) + force_refresh: Force cache refresh + use_key_format: Whether to check if cache_key matches a key template in CACHE_KEYS + **query_params: Parameters to pass to the query function + + Returns: + Any: Data from cache or query result + """ + # Check if cache_key matches a pattern in CACHE_KEYS + actual_key = cache_key + if use_key_format and "{}" in cache_key: + # Look for a template match in CACHE_KEYS + for key_name, key_format in CACHE_KEYS.items(): + if cache_key == key_format: + # We have a match, now look for the id or value to format with + for param_name, param_value in query_params.items(): + if param_name in ["id", "slug", "user", "topic_id", "author_id"]: + actual_key = cache_key.format(param_value) + break + + # If not forcing refresh, try to get data from cache + if not force_refresh: + cached_result = await get_cached_data(actual_key) + if cached_result is not None: + return cached_result + + # If data not in cache or refresh required, execute query + try: + result = await query_func(**query_params) + if result is not None: + # Save result to cache + await cache_data(actual_key, result, ttl) + return result + except Exception as e: + logger.error(f"Error executing query for caching: {e}") + # In case of error, return data from cache if not forcing refresh + if not force_refresh: + return await get_cached_data(actual_key) + raise diff --git a/cache/memorycache.py b/cache/memorycache.py deleted file mode 100644 index 80035ea6..00000000 --- a/cache/memorycache.py +++ /dev/null @@ -1,181 +0,0 @@ -""" -Модуль для кеширования данных с использованием Redis. -Предоставляет API, совместимый с dogpile.cache для поддержки обратной совместимости. -""" - -import functools -import hashlib -import inspect -import json -import logging -import pickle -from typing import Callable, Optional - -import orjson - -from services.redis import redis -from utils.encoders import CustomJSONEncoder - -logger = logging.getLogger(__name__) - -DEFAULT_TTL = 300 # время жизни кеша в секундах (5 минут) - - -class RedisCache: - """ - Класс, предоставляющий API, совместимый с dogpile.cache, но использующий Redis. - - Примеры: - >>> cache_region = RedisCache() - >>> @cache_region.cache_on_arguments("my_key") - ... def my_func(arg1, arg2): - ... return arg1 + arg2 - """ - - def __init__(self, ttl: int = DEFAULT_TTL): - """ - Инициализация объекта кеша. - - Args: - ttl: Время жизни кеша в секундах - """ - self.ttl = ttl - - def cache_on_arguments(self, cache_key: Optional[str] = None) -> Callable: - """ - Декоратор для кеширования результатов функций с использованием Redis. - - Args: - cache_key: Опциональный базовый ключ кеша. Если не указан, генерируется из сигнатуры функции. - - Returns: - Декоратор для кеширования функции - - Примеры: - >>> @cache_region.cache_on_arguments("users") - ... def get_users(): - ... return db.query(User).all() - """ - - def decorator(func: Callable) -> Callable: - @functools.wraps(func) - async def wrapper(*args, **kwargs): - # Генерация ключа кеша - key = self._generate_cache_key(func, cache_key, *args, **kwargs) - - # Попытка получить данные из кеша - cached_data = await redis.get(key) - if cached_data: - try: - return orjson.loads(cached_data) - except Exception: - # Если не удалось десериализовать как JSON, попробуем как pickle - return pickle.loads(cached_data.encode()) - - # Вызов оригинальной функции, если данных в кеше нет - result = func(*args, **kwargs) - - # Сохранение результата в кеш - try: - # Пытаемся сериализовать как JSON - serialized = json.dumps(result, cls=CustomJSONEncoder) - except (TypeError, ValueError): - # Если не удалось, используем pickle - serialized = pickle.dumps(result).decode() - - await redis.set(key, serialized, ex=self.ttl) - return result - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs): - # Для функций, которые не являются корутинами - # Генерация ключа кеша - key = self._generate_cache_key(func, cache_key, *args, **kwargs) - - # Синхронная версия не использует await, поэтому результат всегда вычисляется - result = func(*args, **kwargs) - - # Асинхронно записываем в кэш (будет выполнено позже) - try: - import asyncio - - # Попытка сериализовать результат в JSON - try: - serialized = json.dumps(result, cls=CustomJSONEncoder) - except (TypeError, ValueError) as e: - logger.debug(f"JSON сериализация не удалась, используем pickle: {e}") - # Если не удалось сериализовать как JSON, используем pickle - serialized = pickle.dumps(result).decode() - - asyncio.create_task(redis.set(key, serialized, ex=self.ttl)) - except Exception as e: - logger.error(f"Ошибка при кешировании результата: {e}") - # Для отладки добавляем информацию о типе объекта - logger.debug(f"Тип результата: {type(result)}") - if hasattr(result, "__class__"): - logger.debug(f"Класс результата: {result.__class__.__name__}") - - return result - - # Возвращаем асинхронный или синхронный враппер в зависимости от типа функции - if inspect.iscoroutinefunction(func): - return wrapper - else: - return sync_wrapper - - return decorator - - def _generate_cache_key(self, func: Callable, base_key: Optional[str], *args, **kwargs) -> str: - """ - Генерирует ключ кеша на основе функции и её аргументов. - - Args: - func: Кешируемая функция - base_key: Базовый ключ кеша - *args: Позиционные аргументы функции - **kwargs: Именованные аргументы функции - - Returns: - Строковый ключ для кеша - """ - if base_key: - key_prefix = f"cache:{base_key}" - else: - key_prefix = f"cache:{func.__module__}.{func.__name__}" - - # Создаем хеш аргументов - arg_hash = hashlib.md5() - - # Добавляем позиционные аргументы - for arg in args: - try: - arg_hash.update(str(arg).encode()) - except Exception: - arg_hash.update(str(id(arg)).encode()) - - # Добавляем именованные аргументы (сортируем для детерминированности) - for k in sorted(kwargs.keys()): - try: - arg_hash.update(f"{k}:{kwargs[k]}".encode()) - except Exception: - arg_hash.update(f"{k}:{id(kwargs[k])}".encode()) - - return f"{key_prefix}:{arg_hash.hexdigest()}" - - def invalidate(self, func: Callable, *args, **kwargs) -> None: - """ - Инвалидирует (удаляет) кеш для конкретной функции с конкретными аргументами. - - Args: - func: Кешированная функция - *args: Позиционные аргументы функции - **kwargs: Именованные аргументы функции - """ - key = self._generate_cache_key(func, None, *args, **kwargs) - import asyncio - - asyncio.create_task(redis.execute("DEL", key)) - - -# Экземпляр класса RedisCache для использования в коде -cache_region = RedisCache() diff --git a/cache/precache.py b/cache/precache.py index 5df91f2d..23844024 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -1,7 +1,6 @@ import asyncio import json -import orjson from sqlalchemy import and_, join, select from cache.cache import cache_author, cache_topic @@ -87,11 +86,15 @@ async def precache_data(): # Преобразуем словарь в список аргументов для HSET if value: - flattened = [] - for field, val in value.items(): - flattened.extend([field, val]) - - await redis.execute("HSET", key, *flattened) + # Если значение - словарь, преобразуем его в плоский список для HSET + if isinstance(value, dict): + flattened = [] + for field, val in value.items(): + flattened.extend([field, val]) + await redis.execute("HSET", key, *flattened) + else: + # Предполагаем, что значение уже содержит список + await redis.execute("HSET", key, *value) logger.info(f"redis hash '{key}' was restored") with local_session() as session: diff --git a/cache/revalidator.py b/cache/revalidator.py index 125b9f5f..7be5041c 100644 --- a/cache/revalidator.py +++ b/cache/revalidator.py @@ -1,17 +1,26 @@ import asyncio -from cache.cache import cache_author, cache_topic, get_cached_author, get_cached_topic +from cache.cache import ( + cache_author, + cache_topic, + get_cached_author, + get_cached_topic, + invalidate_cache_by_prefix, +) from resolvers.stat import get_with_stat from utils.logger import root_logger as logger +CACHE_REVALIDATION_INTERVAL = 300 # 5 minutes + class CacheRevalidationManager: - def __init__(self, interval=60): + def __init__(self, interval=CACHE_REVALIDATION_INTERVAL): """Инициализация менеджера с заданным интервалом проверки (в секундах).""" self.interval = interval self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()} self.lock = asyncio.Lock() self.running = True + self.MAX_BATCH_SIZE = 10 # Максимальное количество элементов для поштучной обработки async def start(self): """Запуск фонового воркера для ревалидации кэша.""" @@ -32,22 +41,107 @@ class CacheRevalidationManager: """Обновление кэша для всех сущностей, требующих ревалидации.""" async with self.lock: # Ревалидация кэша авторов - for author_id in self.items_to_revalidate["authors"]: - author = await get_cached_author(author_id, get_with_stat) - if author: - await cache_author(author) - self.items_to_revalidate["authors"].clear() + if self.items_to_revalidate["authors"]: + logger.debug(f"Revalidating {len(self.items_to_revalidate['authors'])} authors") + for author_id in self.items_to_revalidate["authors"]: + if author_id == "all": + await invalidate_cache_by_prefix("authors") + break + author = await get_cached_author(author_id, get_with_stat) + if author: + await cache_author(author) + self.items_to_revalidate["authors"].clear() # Ревалидация кэша тем - for topic_id in self.items_to_revalidate["topics"]: - topic = await get_cached_topic(topic_id) - if topic: - await cache_topic(topic) - self.items_to_revalidate["topics"].clear() + if self.items_to_revalidate["topics"]: + logger.debug(f"Revalidating {len(self.items_to_revalidate['topics'])} topics") + for topic_id in self.items_to_revalidate["topics"]: + if topic_id == "all": + await invalidate_cache_by_prefix("topics") + break + topic = await get_cached_topic(topic_id) + if topic: + await cache_topic(topic) + self.items_to_revalidate["topics"].clear() + + # Ревалидация шаутов (публикаций) + if self.items_to_revalidate["shouts"]: + shouts_count = len(self.items_to_revalidate["shouts"]) + logger.debug(f"Revalidating {shouts_count} shouts") + + # Проверяем наличие специального флага 'all' + if "all" in self.items_to_revalidate["shouts"]: + await invalidate_cache_by_prefix("shouts") + # Если элементов много, но не 'all', используем специфический подход + elif shouts_count > self.MAX_BATCH_SIZE: + # Инвалидируем только collections keys, которые затрагивают много сущностей + collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "shouts:*")) + if collection_keys: + await self._redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей шаутов") + + # Обновляем кеш каждого конкретного шаута + for shout_id in self.items_to_revalidate["shouts"]: + if shout_id != "all": + # Точечная инвалидация для каждого shout_id + specific_keys = [f"shout:id:{shout_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + else: + # Если элементов немного, обрабатываем каждый + for shout_id in self.items_to_revalidate["shouts"]: + if shout_id != "all": + # Точечная инвалидация для каждого shout_id + specific_keys = [f"shout:id:{shout_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + + self.items_to_revalidate["shouts"].clear() + + # Аналогично для реакций - точечная инвалидация + if self.items_to_revalidate["reactions"]: + reactions_count = len(self.items_to_revalidate["reactions"]) + logger.debug(f"Revalidating {reactions_count} reactions") + + if "all" in self.items_to_revalidate["reactions"]: + await invalidate_cache_by_prefix("reactions") + elif reactions_count > self.MAX_BATCH_SIZE: + # Инвалидируем только collections keys для реакций + collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "reactions:*")) + if collection_keys: + await self._redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей реакций") + + # Точечная инвалидация для каждой реакции + for reaction_id in self.items_to_revalidate["reactions"]: + if reaction_id != "all": + specific_keys = [f"reaction:id:{reaction_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + else: + # Точечная инвалидация для каждой реакции + for reaction_id in self.items_to_revalidate["reactions"]: + if reaction_id != "all": + specific_keys = [f"reaction:id:{reaction_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + + self.items_to_revalidate["reactions"].clear() def mark_for_revalidation(self, entity_id, entity_type): """Отметить сущность для ревалидации.""" - self.items_to_revalidate[entity_type].add(entity_id) + if entity_id and entity_type: + self.items_to_revalidate[entity_type].add(entity_id) + + def invalidate_all(self, entity_type): + """Пометить для инвалидации все элементы указанного типа.""" + logger.debug(f"Marking all {entity_type} for invalidation") + # Особый флаг для полной инвалидации + self.items_to_revalidate[entity_type].add("all") async def stop(self): """Остановка фонового воркера.""" @@ -60,4 +154,4 @@ class CacheRevalidationManager: pass -revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут +revalidation_manager = CacheRevalidationManager() diff --git a/docs/caching.md b/docs/caching.md new file mode 100644 index 00000000..7c025be2 --- /dev/null +++ b/docs/caching.md @@ -0,0 +1,279 @@ +# Система кеширования Discours + +## Общее описание + +Система кеширования Discours - это комплексное решение для повышения производительности платформы. Она использует Redis для хранения часто запрашиваемых данных и уменьшения нагрузки на основную базу данных. + +Кеширование реализовано как многоуровневая система, состоящая из нескольких модулей: + +- `cache.py` - основной модуль с функциями кеширования +- `revalidator.py` - асинхронный менеджер ревалидации кеша +- `triggers.py` - триггеры событий SQLAlchemy для автоматической ревалидации +- `precache.py` - предварительное кеширование данных при старте приложения + +## Ключевые компоненты + +### 1. Форматы ключей кеша + +Система поддерживает несколько форматов ключей для обеспечения совместимости и удобства использования: + +- **Ключи сущностей**: `entity:property:value` (например, `author:id:123`) +- **Ключи коллекций**: `entity:collection:params` (например, `authors:stats:limit=10:offset=0`) +- **Специальные ключи**: для обратной совместимости (например, `topic_shouts_123`) + +Все стандартные форматы ключей хранятся в словаре `CACHE_KEYS`: + +```python +CACHE_KEYS = { + "TOPIC_ID": "topic:id:{}", + "TOPIC_SLUG": "topic:slug:{}", + "AUTHOR_ID": "author:id:{}", + # и другие... +} +``` + +### 2. Основные функции кеширования + +#### Структура ключей + +Вместо генерации ключей через вспомогательные функции, система следует строгим конвенциям формирования ключей: + +1. **Ключи для отдельных сущностей** строятся по шаблону: + ``` + entity:property:value + ``` + Например: + - `topic:id:123` - тема с ID 123 + - `author:slug:john-doe` - автор со слагом "john-doe" + - `shout:id:456` - публикация с ID 456 + +2. **Ключи для коллекций** строятся по шаблону: + ``` + entity:collection[:filter1=value1:filter2=value2:...] + ``` + Например: + - `topics:all:basic` - базовый список всех тем + - `authors:stats:limit=10:offset=0:sort=name` - отсортированный список авторов с пагинацией + - `shouts:feed:limit=20:community=1` - лента публикаций с фильтром по сообществу + +3. **Специальные форматы ключей** для обратной совместимости: + ``` + entity_action_id + ``` + Например: + - `topic_shouts_123` - публикации для темы с ID 123 + +Во всех модулях системы разработчики должны явно формировать ключи в соответствии с этими конвенциями, что обеспечивает единообразие и предсказуемость кеширования. + +#### Работа с данными в кеше + +```python +async def cache_data(key, data, ttl=None) +async def get_cached_data(key) +``` + +Эти функции предоставляют универсальный интерфейс для сохранения и получения данных из кеша. Они напрямую используют Redis через вызовы `redis.execute()`. + +#### Высокоуровневое кеширование запросов + +```python +async def cached_query(cache_key, query_func, ttl=None, force_refresh=False, **query_params) +``` + +Функция `cached_query` объединяет получение данных из кеша и выполнение запроса в случае отсутствия данных в кеше. Это основная функция, которую следует использовать в резолверах для кеширования результатов запросов. + +### 3. Кеширование сущностей + +Для основных типов сущностей реализованы специальные функции: + +```python +async def cache_topic(topic: dict) +async def cache_author(author: dict) +async def get_cached_topic(topic_id: int) +async def get_cached_author(author_id: int, get_with_stat) +``` + +Эти функции упрощают работу с часто используемыми типами данных и обеспечивают единообразный подход к их кешированию. + +### 4. Работа со связями + +Для работы со связями между сущностями предназначены функции: + +```python +async def cache_follows(follower_id, entity_type, entity_id, is_insert=True) +async def get_cached_topic_followers(topic_id) +async def get_cached_author_followers(author_id) +async def get_cached_follower_topics(author_id) +``` + +Они позволяют эффективно кешировать и получать информацию о подписках, связях между авторами, темами и публикациями. + +## Система инвалидации кеша + +### 1. Прямая инвалидация + +Система поддерживает два типа инвалидации кеша: + +#### 1.1. Инвалидация по префиксу + +```python +async def invalidate_cache_by_prefix(prefix) +``` + +Позволяет инвалидировать все ключи кеша, начинающиеся с указанного префикса. Используется в резолверах для инвалидации группы кешей при массовых изменениях. + +#### 1.2. Точечная инвалидация + +```python +async def invalidate_authors_cache(author_id=None) +async def invalidate_topics_cache(topic_id=None) +``` + +Эти функции позволяют инвалидировать кеш только для конкретной сущности, что снижает нагрузку на Redis и предотвращает ненужную потерю кешированных данных. Если ID сущности не указан, используется инвалидация по префиксу. + +Примеры использования точечной инвалидации: + +```python +# Инвалидация кеша только для автора с ID 123 +await invalidate_authors_cache(123) + +# Инвалидация кеша только для темы с ID 456 +await invalidate_topics_cache(456) +``` + +### 2. Отложенная инвалидация + +Модуль `revalidator.py` реализует систему отложенной инвалидации кеша через класс `CacheRevalidationManager`: + +```python +class CacheRevalidationManager: + # ... + async def process_revalidation(self): + # ... + def mark_for_revalidation(self, entity_id, entity_type): + # ... +``` + +Менеджер ревалидации работает как асинхронный фоновый процесс, который периодически (по умолчанию каждые 5 минут) проверяет наличие сущностей для ревалидации. + +Особенности реализации: +- Для авторов и тем используется поштучная ревалидация каждой записи +- Для шаутов и реакций используется батчевая обработка, с порогом в 10 элементов +- При достижении порога система переключается на инвалидацию коллекций вместо поштучной обработки +- Специальный флаг `all` позволяет запустить полную инвалидацию всех записей типа + +### 3. Автоматическая инвалидация через триггеры + +Модуль `triggers.py` регистрирует обработчики событий SQLAlchemy, которые автоматически отмечают сущности для ревалидации при изменении данных в базе: + +```python +def events_register(): + event.listen(Author, "after_update", mark_for_revalidation) + event.listen(Topic, "after_update", mark_for_revalidation) + # и другие... +``` + +Триггеры имеют следующие особенности: +- Реагируют на события вставки, обновления и удаления +- Отмечают затронутые сущности для отложенной ревалидации +- Учитывают связи между сущностями (например, при изменении темы обновляются связанные шауты) + +## Предварительное кеширование + +Модуль `precache.py` реализует предварительное кеширование часто используемых данных при старте приложения: + +```python +async def precache_data(): + # ... +``` + +Эта функция выполняется при запуске приложения и заполняет кеш данными, которые будут часто запрашиваться пользователями. + +## Примеры использования + +### Простое кеширование результата запроса + +```python +async def get_topics_with_stats(limit=10, offset=0, by="title"): + # Формирование ключа кеша по конвенции + cache_key = f"topics:stats:limit={limit}:offset={offset}:sort={by}" + + cached_data = await get_cached_data(cache_key) + if cached_data: + return cached_data + + # Выполнение запроса к базе данных + result = ... # логика получения данных + + await cache_data(cache_key, result, ttl=300) + return result +``` + +### Использование обобщенной функции cached_query + +```python +async def get_topics_with_stats(limit=10, offset=0, by="title"): + async def fetch_data(limit, offset, by): + # Логика получения данных + return result + + # Формирование ключа кеша по конвенции + cache_key = f"topics:stats:limit={limit}:offset={offset}:sort={by}" + + return await cached_query( + cache_key, + fetch_data, + ttl=300, + limit=limit, + offset=offset, + by=by + ) +``` + +### Точечная инвалидация кеша при изменении данных + +```python +async def update_topic(topic_id, new_data): + # Обновление данных в базе + # ... + + # Точечная инвалидация кеша только для измененной темы + await invalidate_topics_cache(topic_id) + + return updated_topic +``` + +## Отладка и мониторинг + +Система кеширования использует логгер для отслеживания операций: + +```python +logger.debug(f"Данные получены из кеша по ключу {key}") +logger.debug(f"Удалено {len(keys)} ключей кеша с префиксом {prefix}") +logger.error(f"Ошибка при инвалидации кеша: {e}") +``` + +Это позволяет отслеживать работу кеша и выявлять возможные проблемы на ранних стадиях. + +## Рекомендации по использованию + +1. **Следуйте конвенциям формирования ключей** - это критически важно для консистентности и предсказуемости кеша. +2. **Не создавайте собственные форматы ключей** - используйте существующие шаблоны для обеспечения единообразия. +3. **Не забывайте об инвалидации** - всегда инвалидируйте кеш при изменении данных. +4. **Используйте точечную инвалидацию** - вместо инвалидации по префиксу для снижения нагрузки на Redis. +5. **Устанавливайте разумные TTL** - используйте разные значения TTL в зависимости от частоты изменения данных. +6. **Не кешируйте большие объемы данных** - кешируйте только то, что действительно необходимо для повышения производительности. + +## Технические детали реализации + +- **Сериализация данных**: используется `orjson` для эффективной сериализации и десериализации данных. +- **Форматирование даты и времени**: для корректной работы с датами используется `CustomJSONEncoder`. +- **Асинхронность**: все операции кеширования выполняются асинхронно для минимального влияния на производительность API. +- **Прямое взаимодействие с Redis**: все операции выполняются через прямые вызовы `redis.execute()` с обработкой ошибок. +- **Батчевая обработка**: для массовых операций используется пороговое значение, после которого применяются оптимизированные стратегии. + +## Известные ограничения + +1. **Согласованность данных** - система не гарантирует абсолютную согласованность данных в кеше и базе данных. +2. **Память** - необходимо следить за объемом данных в кеше, чтобы избежать проблем с памятью Redis. +3. **Производительность Redis** - при большом количестве операций с кешем может стать узким местом. diff --git a/docs/features.md b/docs/features.md index b6d1bdc5..e0ed3526 100644 --- a/docs/features.md +++ b/docs/features.md @@ -6,11 +6,7 @@ ## Мультидоменная авторизация -- Поддержка авторизации для разных доменов: - - *.dscrs.site (включая testing.dscrs.site) - - localhost[:port] - - testingdiscoursio-git-*-discoursio.vercel.app - - *.discours.io +- Поддержка авторизации для разных доменов - Автоматическое определение сервера авторизации - Корректная обработка CORS для всех поддерживаемых доменов @@ -35,10 +31,6 @@ ## CORS Configuration -- Поддерживаются домены: - - *.dscrs.site (включая testing.dscrs.site, core.dscrs.site) - - *.discours.io (включая testing.discours.io) - - localhost (включая порты) - Поддерживаемые методы: GET, POST, OPTIONS - Настроена поддержка credentials - Разрешенные заголовки: Authorization, Content-Type, X-Requested-With, DNT, Cache-Control diff --git a/orm/author.py b/orm/author.py index c83b9d4f..4be2c630 100644 --- a/orm/author.py +++ b/orm/author.py @@ -1,6 +1,6 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from services.db import Base @@ -8,6 +8,15 @@ from services.db import Base class AuthorRating(Base): + """ + Рейтинг автора от другого автора. + + Attributes: + rater (int): ID оценивающего автора + author (int): ID оцениваемого автора + plus (bool): Положительная/отрицательная оценка + """ + __tablename__ = "author_rating" id = None # type: ignore @@ -15,8 +24,26 @@ class AuthorRating(Base): author = Column(ForeignKey("author.id"), primary_key=True) plus = Column(Boolean) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех оценок конкретного автора + Index("idx_author_rating_author", "author"), + # Индекс для быстрого поиска всех оценок, оставленных конкретным автором + Index("idx_author_rating_rater", "rater"), + ) + class AuthorFollower(Base): + """ + Подписка одного автора на другого. + + Attributes: + follower (int): ID подписчика + author (int): ID автора, на которого подписываются + created_at (int): Время создания подписки + auto (bool): Признак автоматической подписки + """ + __tablename__ = "author_follower" id = None # type: ignore @@ -25,16 +52,57 @@ class AuthorFollower(Base): created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) auto = Column(Boolean, nullable=False, default=False) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех подписчиков автора + Index("idx_author_follower_author", "author"), + # Индекс для быстрого поиска всех авторов, на которых подписан конкретный автор + Index("idx_author_follower_follower", "follower"), + ) + class AuthorBookmark(Base): + """ + Закладка автора на публикацию. + + Attributes: + author (int): ID автора + shout (int): ID публикации + """ + __tablename__ = "author_bookmark" id = None # type: ignore author = Column(ForeignKey("author.id"), primary_key=True) shout = Column(ForeignKey("shout.id"), primary_key=True) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех закладок автора + Index("idx_author_bookmark_author", "author"), + # Индекс для быстрого поиска всех авторов, добавивших публикацию в закладки + Index("idx_author_bookmark_shout", "shout"), + ) + class Author(Base): + """ + Модель автора в системе. + + Attributes: + user (str): Идентификатор пользователя в системе авторизации + name (str): Отображаемое имя + slug (str): Уникальный строковый идентификатор + bio (str): Краткая биография/статус + about (str): Полное описание + pic (str): URL изображения профиля + links (dict): Ссылки на социальные сети и сайты + created_at (int): Время создания профиля + last_seen (int): Время последнего посещения + updated_at (int): Время последнего обновления + deleted_at (int): Время удаления (если профиль удален) + """ + __tablename__ = "author" user = Column(String) # unbounded link with authorizer's User type @@ -53,3 +121,17 @@ class Author(Base): # search_vector = Column( # TSVectorType("name", "slug", "bio", "about", regconfig="pg_catalog.russian") # ) + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска по slug + Index("idx_author_slug", "slug"), + # Индекс для быстрого поиска по идентификатору пользователя + Index("idx_author_user", "user"), + # Индекс для фильтрации неудаленных авторов + Index("idx_author_deleted_at", "deleted_at", postgresql_where=deleted_at.is_(None)), + # Индекс для сортировки по времени создания (для новых авторов) + Index("idx_author_created_at", "created_at"), + # Индекс для сортировки по времени последнего посещения + Index("idx_author_last_seen", "last_seen"), + ) diff --git a/resolvers/author.py b/resolvers/author.py index 604f2bc6..e4cfd794 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,25 +1,196 @@ import asyncio import time +from typing import Optional -from sqlalchemy import desc, select, text +from sqlalchemy import select, text from cache.cache import ( cache_author, + cached_query, get_cached_author, get_cached_author_by_user_id, get_cached_author_followers, get_cached_follower_authors, get_cached_follower_topics, + invalidate_cache_by_prefix, ) from orm.author import Author -from orm.shout import ShoutAuthor, ShoutTopic -from orm.topic import Topic from resolvers.stat import get_with_stat from services.auth import login_required from services.db import local_session +from services.redis import redis from services.schema import mutation, query from utils.logger import root_logger as logger +DEFAULT_COMMUNITIES = [1] + + +# Вспомогательная функция для получения всех авторов без статистики +async def get_all_authors(): + """ + Получает всех авторов без статистики. + Используется для случаев, когда нужен полный список авторов без дополнительной информации. + + Returns: + list: Список всех авторов без статистики + """ + cache_key = "authors:all:basic" + + # Функция для получения всех авторов из БД + async def fetch_all_authors(): + logger.debug("Получаем список всех авторов из БД и кешируем результат") + + with local_session() as session: + # Запрос на получение базовой информации об авторах + authors_query = select(Author).where(Author.deleted_at.is_(None)) + authors = session.execute(authors_query).scalars().all() + + # Преобразуем авторов в словари + return [author.dict() for author in authors] + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_all_authors) + + +# Вспомогательная функция для получения авторов со статистикой с пагинацией +async def get_authors_with_stats(limit=50, offset=0, by: Optional[str] = None): + """ + Получает авторов со статистикой с пагинацией. + + Args: + limit: Максимальное количество возвращаемых авторов + offset: Смещение для пагинации + by: Опциональный параметр сортировки (new/active) + + Returns: + list: Список авторов с их статистикой + """ + # Формируем ключ кеша с помощью универсальной функции + cache_key = f"authors:stats:limit={limit}:offset={offset}" + + # Функция для получения авторов из БД + async def fetch_authors_with_stats(): + logger.debug(f"Выполняем запрос на получение авторов со статистикой: limit={limit}, offset={offset}, by={by}") + + with local_session() as session: + # Базовый запрос для получения авторов + base_query = select(Author).where(Author.deleted_at.is_(None)) + + # Применяем сортировку + if by: + if isinstance(by, dict): + # Обработка словаря параметров сортировки + from sqlalchemy import asc, desc + + for field, direction in by.items(): + column = getattr(Author, field, None) + if column: + if direction.lower() == "desc": + base_query = base_query.order_by(desc(column)) + else: + base_query = base_query.order_by(column) + elif by == "new": + base_query = base_query.order_by(desc(Author.created_at)) + elif by == "active": + base_query = base_query.order_by(desc(Author.last_seen)) + else: + # По умолчанию сортируем по времени создания + base_query = base_query.order_by(desc(Author.created_at)) + else: + base_query = base_query.order_by(desc(Author.created_at)) + + # Применяем лимит и смещение + base_query = base_query.limit(limit).offset(offset) + + # Получаем авторов + authors = session.execute(base_query).scalars().all() + author_ids = [author.id for author in authors] + + if not author_ids: + return [] + + # Оптимизированный запрос для получения статистики по публикациям для авторов + shouts_stats_query = f""" + SELECT sa.author, COUNT(DISTINCT s.id) as shouts_count + FROM shout_author sa + JOIN shout s ON sa.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL + WHERE sa.author IN ({",".join(map(str, author_ids))}) + GROUP BY sa.author + """ + shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} + + # Запрос на получение статистики по подписчикам для авторов + followers_stats_query = f""" + SELECT author, COUNT(DISTINCT follower) as followers_count + FROM author_follower + WHERE author IN ({",".join(map(str, author_ids))}) + GROUP BY author + """ + followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} + + # Формируем результат с добавлением статистики + result = [] + for author in authors: + author_dict = author.dict() + author_dict["stat"] = { + "shouts": shouts_stats.get(author.id, 0), + "followers": followers_stats.get(author.id, 0), + } + result.append(author_dict) + + # Кешируем каждого автора отдельно для использования в других функциях + await cache_author(author_dict) + + return result + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_authors_with_stats) + + +# Функция для инвалидации кеша авторов +async def invalidate_authors_cache(author_id=None): + """ + Инвалидирует кеши авторов при изменении данных. + + Args: + author_id: Опциональный ID автора для точечной инвалидации. + Если не указан, инвалидируются все кеши авторов. + """ + if author_id: + # Точечная инвалидация конкретного автора + logger.debug(f"Инвалидация кеша для автора #{author_id}") + specific_keys = [ + f"author:id:{author_id}", + f"author:followers:{author_id}", + f"author:follows-authors:{author_id}", + f"author:follows-topics:{author_id}", + f"author:follows-shouts:{author_id}", + ] + + # Получаем user_id автора, если есть + with local_session() as session: + author = session.query(Author).filter(Author.id == author_id).first() + if author and author.user: + specific_keys.append(f"author:user:{author.user.strip()}") + + # Удаляем конкретные ключи + for key in specific_keys: + try: + await redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + except Exception as e: + logger.error(f"Ошибка при удалении ключа {key}: {e}") + + # Также ищем и удаляем ключи коллекций, содержащих данные об этом авторе + collection_keys = await redis.execute("KEYS", "authors:stats:*") + if collection_keys: + await redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей авторов") + else: + # Общая инвалидация всех кешей авторов + logger.debug("Полная инвалидация кеша авторов") + await invalidate_cache_by_prefix("authors") + @mutation.field("update_author") @login_required @@ -51,10 +222,30 @@ async def update_author(_, info, profile): @query.field("get_authors_all") -def get_authors_all(_, _info): - with local_session() as session: - authors = session.query(Author).all() - return authors +async def get_authors_all(_, _info): + """ + Получает список всех авторов без статистики. + + Returns: + list: Список всех авторов + """ + return await get_all_authors() + + +@query.field("get_authors_paginated") +async def get_authors_paginated(_, _info, limit=50, offset=0, by=None): + """ + Получает список авторов с пагинацией и статистикой. + + Args: + limit: Максимальное количество возвращаемых авторов + offset: Смещение для пагинации + by: Параметр сортировки (new/active) + + Returns: + list: Список авторов с их статистикой + """ + return await get_authors_with_stats(limit, offset, by) @query.field("get_author") @@ -105,145 +296,105 @@ async def get_author_id(_, _info, user: str): asyncio.create_task(cache_author(author_dict)) return author_with_stat except Exception as exc: - import traceback - - traceback.print_exc() - logger.error(exc) + logger.error(f"Error getting author: {exc}") + return None @query.field("load_authors_by") async def load_authors_by(_, _info, by, limit, offset): - logger.debug(f"loading authors by {by}") - authors_query = select(Author) + """ + Загружает авторов по заданному критерию с пагинацией. - if by.get("slug"): - authors_query = authors_query.filter(Author.slug.ilike(f"%{by['slug']}%")) - elif by.get("name"): - authors_query = authors_query.filter(Author.name.ilike(f"%{by['name']}%")) - elif by.get("topic"): - authors_query = ( - authors_query.join(ShoutAuthor) # Первое соединение ShoutAuthor - .join(ShoutTopic, ShoutAuthor.shout == ShoutTopic.shout) - .join(Topic, ShoutTopic.topic == Topic.id) - .filter(Topic.slug == str(by["topic"])) - ) + Args: + by: Критерий сортировки авторов (new/active) + limit: Максимальное количество возвращаемых авторов + offset: Смещение для пагинации - if by.get("last_seen"): # в unix time - before = int(time.time()) - by["last_seen"] - authors_query = authors_query.filter(Author.last_seen > before) - elif by.get("created_at"): # в unix time - before = int(time.time()) - by["created_at"] - authors_query = authors_query.filter(Author.created_at > before) - - authors_query = authors_query.limit(limit).offset(offset) - - with local_session() as session: - authors_nostat = session.execute(authors_query).all() - authors = [] - for a in authors_nostat: - if isinstance(a, Author): - author_dict = await get_cached_author(a.id, get_with_stat) - if author_dict and isinstance(author_dict.get("shouts"), int): - authors.append(author_dict) - - # order - order = by.get("order") - if order in ["shouts", "followers"]: - authors_query = authors_query.order_by(desc(text(f"{order}_stat"))) - - # group by - authors = get_with_stat(authors_query) - return authors or [] + Returns: + list: Список авторов с учетом критерия + """ + # Используем оптимизированную функцию для получения авторов + return await get_authors_with_stats(limit, offset, by) def get_author_id_from(slug="", user=None, author_id=None): - if not slug and not user and not author_id: - raise ValueError("One of slug, user, or author_id must be provided") - - author_query = select(Author.id) - if user: - author_query = author_query.filter(Author.user == user) - elif slug: - author_query = author_query.filter(Author.slug == slug) - elif author_id: - author_query = author_query.filter(Author.id == author_id) - - with local_session() as session: - author_id_result = session.execute(author_query).first() - author_id = author_id_result[0] if author_id_result else None - - if not author_id: - raise ValueError("Author not found") - + try: + author_id = None + if author_id: + return author_id + with local_session() as session: + author = None + if slug: + author = session.query(Author).filter(Author.slug == slug).first() + if author: + author_id = author.id + return author_id + if user: + author = session.query(Author).filter(Author.user == user).first() + if author: + author_id = author.id + except Exception as exc: + logger.error(exc) return author_id @query.field("get_author_follows") async def get_author_follows(_, _info, slug="", user=None, author_id=0): - try: - author_id = get_author_id_from(slug, user, author_id) + logger.debug(f"getting follows for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return {} - if bool(author_id): - logger.debug(f"getting {author_id} follows authors") - authors = await get_cached_follower_authors(author_id) - topics = await get_cached_follower_topics(author_id) - return { - "topics": topics, - "authors": authors, - "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], - } - except Exception: - import traceback + followed_authors = await get_cached_follower_authors(author_id) + followed_topics = await get_cached_follower_topics(author_id) - traceback.print_exc() - return {"error": "Author not found"} + # TODO: Get followed communities too + return { + "authors": followed_authors, + "topics": followed_topics, + "communities": DEFAULT_COMMUNITIES, + "shouts": [], + } @query.field("get_author_follows_topics") async def get_author_follows_topics(_, _info, slug="", user=None, author_id=None): - try: - follower_id = get_author_id_from(slug, user, author_id) - topics = await get_cached_follower_topics(follower_id) - return topics - except Exception: - import traceback - - traceback.print_exc() + logger.debug(f"getting followed topics for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return [] + followed_topics = await get_cached_follower_topics(author_id) + return followed_topics @query.field("get_author_follows_authors") async def get_author_follows_authors(_, _info, slug="", user=None, author_id=None): - try: - follower_id = get_author_id_from(slug, user, author_id) - return await get_cached_follower_authors(follower_id) - except Exception: - import traceback - - traceback.print_exc() + logger.debug(f"getting followed authors for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return [] + followed_authors = await get_cached_follower_authors(author_id) + return followed_authors def create_author(user_id: str, slug: str, name: str = ""): + author = Author() + author.user = user_id # Связь с user_id из системы авторизации + author.slug = slug # Идентификатор из системы авторизации + author.created_at = author.updated_at = int(time.time()) + author.name = name or slug # если не указано + with local_session() as session: - try: - author = None - if user_id: - author = session.query(Author).filter(Author.user == user_id).first() - elif slug: - author = session.query(Author).filter(Author.slug == slug).first() - if not author: - new_author = Author(user=user_id, slug=slug, name=name) - session.add(new_author) - session.commit() - logger.info(f"author created by webhook {new_author.dict()}") - except Exception as exc: - logger.debug(exc) + session.add(author) + session.commit() + return author @query.field("get_author_followers") async def get_author_followers(_, _info, slug: str = "", user: str = "", author_id: int = 0): - logger.debug(f"getting followers for @{slug}") + logger.debug(f"getting followers for author @{slug} or ID:{author_id}") author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) - followers = [] - if author_id: - followers = await get_cached_author_followers(author_id) + if not author_id: + return [] + followers = await get_cached_author_followers(author_id) return followers diff --git a/resolvers/topic.py b/resolvers/topic.py index 9dfc245b..855d8a82 100644 --- a/resolvers/topic.py +++ b/resolvers/topic.py @@ -1,18 +1,15 @@ -import time - -from sqlalchemy import func, select, text +from sqlalchemy import desc, select, text from cache.cache import ( cache_topic, + cached_query, get_cached_topic_authors, get_cached_topic_by_slug, get_cached_topic_followers, - redis_operation, + invalidate_cache_by_prefix, ) -from cache.memorycache import cache_region from orm.author import Author -from orm.shout import Shout, ShoutTopic -from orm.topic import Topic, TopicFollower +from orm.topic import Topic from resolvers.stat import get_with_stat from services.auth import login_required from services.db import local_session @@ -30,42 +27,26 @@ async def get_all_topics(): Returns: list: Список всех тем без статистики """ - # Пытаемся получить данные из кеша - cached_topics = await redis_operation("GET", "topics:all:basic") + cache_key = "topics:all:basic" - if cached_topics: - logger.debug("Используем кешированные базовые данные о темах из Redis") - try: - import json + # Функция для получения всех тем из БД + async def fetch_all_topics(): + logger.debug("Получаем список всех тем из БД и кешируем результат") - return json.loads(cached_topics) - except Exception as e: - logger.error(f"Ошибка при десериализации тем из Redis: {e}") + with local_session() as session: + # Запрос на получение базовой информации о темах + topics_query = select(Topic) + topics = session.execute(topics_query).scalars().all() - # Если в кеше нет данных, выполняем запрос в БД - logger.debug("Получаем список всех тем из БД и кешируем результат") + # Преобразуем темы в словари + return [topic.dict() for topic in topics] - with local_session() as session: - # Запрос на получение базовой информации о темах - topics_query = select(Topic) - topics = session.execute(topics_query).scalars().all() - - # Преобразуем темы в словари - result = [topic.dict() for topic in topics] - - # Кешируем результат в Redis без TTL (будет обновляться только при изменениях) - try: - import json - - await redis_operation("SET", "topics:all:basic", json.dumps(result)) - except Exception as e: - logger.error(f"Ошибка при кешировании тем в Redis: {e}") - - return result + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_all_topics) # Вспомогательная функция для получения тем со статистикой с пагинацией -async def get_topics_with_stats(limit=100, offset=0, community_id=None): +async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None): """ Получает темы со статистикой с пагинацией. @@ -73,105 +54,140 @@ async def get_topics_with_stats(limit=100, offset=0, community_id=None): limit: Максимальное количество возвращаемых тем offset: Смещение для пагинации community_id: Опциональный ID сообщества для фильтрации + by: Опциональный параметр сортировки Returns: list: Список тем с их статистикой """ - # Формируем ключ кеша с учетом параметров - cache_key = f"topics:stats:limit={limit}:offset={offset}" - if community_id: - cache_key += f":community={community_id}" + # Формируем ключ кеша с помощью универсальной функции + cache_key = f"topics:stats:limit={limit}:offset={offset}:community_id={community_id}" - # Пытаемся получить данные из кеша - cached_topics = await redis_operation("GET", cache_key) + # Функция для получения тем из БД + async def fetch_topics_with_stats(): + logger.debug(f"Выполняем запрос на получение тем со статистикой: limit={limit}, offset={offset}") - if cached_topics: - logger.debug(f"Используем кешированные данные о темах из Redis: {cache_key}") - try: - import json + with local_session() as session: + # Базовый запрос для получения тем + base_query = select(Topic) - return json.loads(cached_topics) - except Exception as e: - logger.error(f"Ошибка при десериализации тем из Redis: {e}") + # Добавляем фильтр по сообществу, если указан + if community_id: + base_query = base_query.where(Topic.community == community_id) - # Если в кеше нет данных, выполняем оптимизированный запрос - logger.debug(f"Выполняем запрос на получение тем со статистикой: limit={limit}, offset={offset}") + # Применяем сортировку на основе параметра by + if by: + if isinstance(by, dict): + # Обработка словаря параметров сортировки + for field, direction in by.items(): + column = getattr(Topic, field, None) + if column: + if direction.lower() == "desc": + base_query = base_query.order_by(desc(column)) + else: + base_query = base_query.order_by(column) + elif by == "popular": + # Сортировка по популярности (количеству публикаций) + # Примечание: это требует дополнительного запроса или подзапроса + base_query = base_query.order_by( + desc(Topic.id) + ) # Временно, нужно заменить на proper implementation + else: + # По умолчанию сортируем по ID в обратном порядке + base_query = base_query.order_by(desc(Topic.id)) + else: + # По умолчанию сортируем по ID в обратном порядке + base_query = base_query.order_by(desc(Topic.id)) - with local_session() as session: - # Базовый запрос для получения тем - base_query = select(Topic) + # Применяем лимит и смещение + base_query = base_query.limit(limit).offset(offset) - # Добавляем фильтр по сообществу, если указан - if community_id: - base_query = base_query.where(Topic.community == community_id) + # Получаем темы + topics = session.execute(base_query).scalars().all() + topic_ids = [topic.id for topic in topics] - # Применяем лимит и смещение - base_query = base_query.limit(limit).offset(offset) + if not topic_ids: + return [] - # Получаем темы - topics = session.execute(base_query).scalars().all() - topic_ids = [topic.id for topic in topics] + # Запрос на получение статистики по публикациям для выбранных тем + shouts_stats_query = f""" + SELECT st.topic, COUNT(DISTINCT s.id) as shouts_count + FROM shout_topic st + JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL + WHERE st.topic IN ({",".join(map(str, topic_ids))}) + GROUP BY st.topic + """ + shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} - if not topic_ids: - return [] + # Запрос на получение статистики по подписчикам для выбранных тем + followers_stats_query = f""" + SELECT topic, COUNT(DISTINCT follower) as followers_count + FROM topic_followers + WHERE topic IN ({",".join(map(str, topic_ids))}) + GROUP BY topic + """ + followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} - # Запрос на получение статистики по публикациям для выбранных тем - shouts_stats_query = f""" - SELECT st.topic, COUNT(DISTINCT s.id) as shouts_count - FROM shout_topic st - JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL - WHERE st.topic IN ({",".join(map(str, topic_ids))}) - GROUP BY st.topic - """ - shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} + # Формируем результат с добавлением статистики + result = [] + for topic in topics: + topic_dict = topic.dict() + topic_dict["stat"] = { + "shouts": shouts_stats.get(topic.id, 0), + "followers": followers_stats.get(topic.id, 0), + } + result.append(topic_dict) - # Запрос на получение статистики по подписчикам для выбранных тем - followers_stats_query = f""" - SELECT topic, COUNT(DISTINCT follower) as followers_count - FROM topic_followers - WHERE topic IN ({",".join(map(str, topic_ids))}) - GROUP BY topic - """ - followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} + # Кешируем каждую тему отдельно для использования в других функциях + await cache_topic(topic_dict) - # Формируем результат с добавлением статистики - result = [] - for topic in topics: - topic_dict = topic.dict() - topic_dict["stat"] = { - "shouts": shouts_stats.get(topic.id, 0), - "followers": followers_stats.get(topic.id, 0), - } - result.append(topic_dict) + return result - # Кешируем каждую тему отдельно для использования в других функциях - await cache_topic(topic_dict) - - # Кешируем полный результат в Redis без TTL (будет обновляться только при изменениях) - try: - import json - - await redis_operation("SET", cache_key, json.dumps(result)) - except Exception as e: - logger.error(f"Ошибка при кешировании тем в Redis: {e}") - - return result + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_topics_with_stats) # Функция для инвалидации кеша тем -async def invalidate_topics_cache(): +async def invalidate_topics_cache(topic_id=None): """ - Инвалидирует все кеши тем при изменении данных. + Инвалидирует кеши тем при изменении данных. + + Args: + topic_id: Опциональный ID темы для точечной инвалидации. + Если не указан, инвалидируются все кеши тем. """ - logger.debug("Инвалидация кеша тем") + if topic_id: + # Точечная инвалидация конкретной темы + logger.debug(f"Инвалидация кеша для темы #{topic_id}") + specific_keys = [ + f"topic:id:{topic_id}", + f"topic:authors:{topic_id}", + f"topic:followers:{topic_id}", + f"topic_shouts_{topic_id}", + ] - # Получаем все ключи, начинающиеся с "topics:" - topic_keys = await redis.execute("KEYS", "topics:*") + # Получаем slug темы, если есть + with local_session() as session: + topic = session.query(Topic).filter(Topic.id == topic_id).first() + if topic and topic.slug: + specific_keys.append(f"topic:slug:{topic.slug}") - if topic_keys: - # Удаляем все найденные ключи - await redis.execute("DEL", *topic_keys) - logger.debug(f"Удалено {len(topic_keys)} ключей кеша тем") + # Удаляем конкретные ключи + for key in specific_keys: + try: + await redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + except Exception as e: + logger.error(f"Ошибка при удалении ключа {key}: {e}") + + # Также ищем и удаляем ключи коллекций, содержащих данные об этой теме + collection_keys = await redis.execute("KEYS", "topics:stats:*") + if collection_keys: + await redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей тем") + else: + # Общая инвалидация всех кешей тем + logger.debug("Полная инвалидация кеша тем") + await invalidate_cache_by_prefix("topics") # Запрос на получение всех тем @@ -188,23 +204,24 @@ async def get_topics_all(_, _info): # Запрос на получение тем с пагинацией и статистикой @query.field("get_topics_paginated") -async def get_topics_paginated(_, _info, limit=100, offset=0): +async def get_topics_paginated(_, _info, limit=100, offset=0, by=None): """ Получает список тем с пагинацией и статистикой. Args: limit: Максимальное количество возвращаемых тем offset: Смещение для пагинации + by: Опциональные параметры сортировки Returns: list: Список тем с их статистикой """ - return await get_topics_with_stats(limit, offset) + return await get_topics_with_stats(limit, offset, None, by) # Запрос на получение тем по сообществу @query.field("get_topics_by_community") -async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0): +async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0, by=None): """ Получает список тем, принадлежащих указанному сообществу с пагинацией и статистикой. @@ -212,11 +229,12 @@ async def get_topics_by_community(_, _info, community_id: int, limit=100, offset community_id: ID сообщества limit: Максимальное количество возвращаемых тем offset: Смещение для пагинации + by: Опциональные параметры сортировки Returns: list: Список тем с их статистикой """ - return await get_topics_with_stats(limit, offset, community_id) + return await get_topics_with_stats(limit, offset, community_id, by) # Запрос на получение тем по автору @@ -268,14 +286,18 @@ async def update_topic(_, _info, topic_input): if not topic: return {"error": "topic not found"} else: + old_slug = topic.slug Topic.update(topic, topic_input) session.add(topic) session.commit() - # Инвалидируем кеш всех тем и конкретной темы - await invalidate_topics_cache() - await redis.execute("DEL", f"topic:slug:{slug}") - await redis.execute("DEL", f"topic:id:{topic.id}") + # Инвалидируем кеш только для этой конкретной темы + await invalidate_topics_cache(topic.id) + + # Если slug изменился, удаляем старый ключ + if old_slug != topic.slug: + await redis.execute("DEL", f"topic:slug:{old_slug}") + logger.debug(f"Удален ключ кеша для старого slug: {old_slug}") return {"topic": topic} diff --git a/services/db.py b/services/db.py index b81873ba..e9a6b58c 100644 --- a/services/db.py +++ b/services/db.py @@ -17,7 +17,7 @@ from sqlalchemy import ( exc, func, inspect, - text + text, ) from sqlalchemy.orm import Session, configure_mappers, declarative_base from sqlalchemy.sql.schema import Table