diff --git a/CHANGELOG.md b/CHANGELOG.md index 99a89d78..d25f8dcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,89 @@ +#### [0.4.17] - 2025-03-26 +- Fixed `'Reaction' object is not subscriptable` error in hierarchical comments: + - Modified `get_reactions_with_stat()` to convert Reaction objects to dictionaries + - Added default values for limit/offset parameters + - Fixed `load_first_replies()` implementation with proper parameter passing + - Added doctest with example usage + - Limited child comments to 100 per parent for performance + +#### [0.4.16] - 2025-03-22 +- Added hierarchical comments pagination: + - Created new GraphQL query `load_comments_branch` for efficient loading of hierarchical comments + - Ability to load root comments with their first N replies + - Added pagination for both root and child comments + - Using existing `comments_count` field in `Stat` type to display number of replies + - Added special `first_replies` field to store first replies to a comment + - Optimized SQL queries for efficient loading of comment hierarchies + - Implemented flexible comment sorting system (by time, rating) + +#### [0.4.15] - 2025-03-22 +- Upgraded caching system described `docs/caching.md` +- Module `cache/memorycache.py` removed +- Enhanced caching system with backward compatibility: + - Unified cache key generation with support for existing naming patterns + - Improved Redis operation function with better error handling + - Updated precache module to use consistent Redis interface + - Integrated revalidator with the invalidation system for better performance + - Added comprehensive documentation for the caching system + - Enhanced cached_query to support template-based cache keys + - Standardized error handling across all cache operations +- Optimized cache invalidation system: + - Added targeted invalidation for individual entities (authors, topics) + - Improved revalidation manager with individual object processing + - Implemented batched processing for high-volume invalidations + - Reduced Redis operations by using precise key invalidation instead of prefix-based wipes + - Added special handling for slug changes in topics +- Unified caching system for all models: + - Implemented abstract functions `cache_data`, `get_cached_data` and `invalidate_cache_by_prefix` + - Added `cached_query` function for unified approach to query caching + - Updated resolvers `author.py` and `topic.py` to use the new caching API + - Improved logging for cache operations to simplify debugging + - Optimized Redis memory usage through key format unification +- Improved caching and sorting in Topic and Author modules: + - Added support for dictionary sorting parameters in `by` for both modules + - Optimized cache key generation for stable behavior with various parameters + - Enhanced sorting logic with direction support and arbitrary fields + - Added `by` parameter support in the API for getting topics by community +- Performance optimizations for author-related queries: + - Added SQLAlchemy-managed indexes to `Author`, `AuthorFollower`, `AuthorRating` and `AuthorBookmark` models + - Implemented persistent Redis caching for author queries without TTL (invalidated only on changes) + - Optimized author retrieval with separate endpoints: + - `get_authors_all` - returns all non-deleted authors without statistics + - `load_authors_by` - optimized to use caching and efficient sorting and pagination + - Improved SQL queries with optimized JOIN conditions and efficient filtering + - Added pre-aggregation of statistics (shouts count, followers count) in single efficient queries + - Implemented robust cache invalidation on author updates + - Created necessary indexes for author lookups by user ID, slug, and timestamps + +#### [0.4.14] - 2025-03-21 +- Significant performance improvements for topic queries: + - Added database indexes to optimize JOIN operations + - Implemented persistent Redis caching for topic queries (no TTL, invalidated only on changes) + - Optimized topic retrieval with separate endpoints for different use cases: + - `get_topics_all` - returns all topics without statistics for lightweight listing + - `get_topics_by_community` - adds pagination and optimized filtering by community + - Added SQLAlchemy-managed indexes directly in ORM models for automatic schema maintenance + - Created `sync_indexes()` function for automatic index synchronization during app startup + - Reduced database load by pre-aggregating statistics in optimized SQL queries + - Added robust cache invalidation on topic create/update/delete operations + - Improved query optimization with proper JOIN conditions and specific partial indexes + +#### [0.4.13] - 2025-03-20 +- Fixed Topic objects serialization error in cache/memorycache.py +- Improved CustomJSONEncoder to support SQLAlchemy models with dict() method +- Enhanced error handling in cache_on_arguments decorator +- Modified `load_reactions_by` to include deleted reactions when `include_deleted=true` for proper comment tree building +- Fixed featured/unfeatured logic in reaction processing: + - Dislike reactions now properly take precedence over likes + - Featured status now requires more than 4 likes from users with featured articles + - Removed unnecessary filters for deleted reactions since rating reactions are physically deleted + - Author's featured status now based on having non-deleted articles with featured_at + +#### [0.4.12] - 2025-03-19 +- `delete_reaction` detects comments and uses `deleted_at` update +- `check_to_unfeature` etc. update +- dogpile dep in `services/memorycache.py` optimized + #### [0.4.11] - 2025-02-12 - `create_draft` resolver requires draft_id fixed - `create_draft` resolver defaults body and title fields to empty string @@ -72,7 +158,7 @@ #### [0.4.4] - `followers_stat` removed for shout - sqlite3 support added -- `rating_stat` and `commented_stat` fixes +- `rating_stat` and `comments_count` fixes #### [0.4.3] - cache reimplemented @@ -228,22 +314,4 @@ #### [0.2.7] -- `loadFollowedReactions` now with `login_required` -- notifier service api draft -- added `shout` visibility kind in schema -- community isolated from author in orm - - -#### [0.2.6] -- redis connection pool -- auth context fixes -- communities orm, resolvers, schema - - -#### [0.2.5] -- restructured -- all users have their profiles as authors in core -- `gittask`, `inbox` and `auth` logics removed -- `settings` moved to base and now smaller -- new outside auth schema -- removed `gittask`, `auth`, `inbox`, `migration` +- `loadFollowedReactions` now with ` \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index facd6c06..b481d544 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,4 +13,6 @@ RUN pip install -r requirements.txt COPY . . -CMD ["python", "server.py"] \ No newline at end of file +EXPOSE 8000 + +CMD ["python", "-m", "granian", "main:app", "--interface", "asgi", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/cache/cache.py b/cache/cache.py index afd16990..5b8ea5d6 100644 --- a/cache/cache.py +++ b/cache/cache.py @@ -1,7 +1,37 @@ +""" +Caching system for the Discours platform +---------------------------------------- + +This module provides a comprehensive caching solution with these key components: + +1. KEY NAMING CONVENTIONS: + - Entity-based keys: "entity:property:value" (e.g., "author:id:123") + - Collection keys: "entity:collection:params" (e.g., "authors:stats:limit=10:offset=0") + - Special case keys: Maintained for backwards compatibility (e.g., "topic_shouts_123") + +2. CORE FUNCTIONS: + - cached_query(): High-level function for retrieving cached data or executing queries + +3. ENTITY-SPECIFIC FUNCTIONS: + - cache_author(), cache_topic(): Cache entity data + - get_cached_author(), get_cached_topic(): Retrieve entity data from cache + - invalidate_cache_by_prefix(): Invalidate all keys with a specific prefix + +4. CACHE INVALIDATION STRATEGY: + - Direct invalidation via invalidate_* functions for immediate changes + - Delayed invalidation via revalidation_manager for background processing + - Event-based triggers for automatic cache updates (see triggers.py) + +To maintain consistency with the existing codebase, this module preserves +the original key naming patterns while providing a more structured approach +for new cache operations. +""" + import asyncio import json -from typing import List +from typing import Any, Dict, List, Optional, Union +import orjson from sqlalchemy import and_, join, select from orm.author import Author, AuthorFollower @@ -19,8 +49,10 @@ DEFAULT_FOLLOWS = { "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], } -CACHE_TTL = 300 # 5 минут +CACHE_TTL = 300 # 5 minutes +# Key templates for common entity types +# These are used throughout the codebase and should be maintained for compatibility CACHE_KEYS = { "TOPIC_ID": "topic:id:{}", "TOPIC_SLUG": "topic:slug:{}", @@ -37,8 +69,8 @@ CACHE_KEYS = { async def cache_topic(topic: dict): payload = json.dumps(topic, cls=CustomJSONEncoder) await asyncio.gather( - redis_operation("SET", f"topic:id:{topic['id']}", payload), - redis_operation("SET", f"topic:slug:{topic['slug']}", payload), + redis.execute("SET", f"topic:id:{topic['id']}", payload), + redis.execute("SET", f"topic:slug:{topic['slug']}", payload), ) @@ -46,30 +78,30 @@ async def cache_topic(topic: dict): async def cache_author(author: dict): payload = json.dumps(author, cls=CustomJSONEncoder) await asyncio.gather( - redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])), - redis_operation("SET", f"author:id:{author['id']}", payload), + redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])), + redis.execute("SET", f"author:id:{author['id']}", payload), ) # Cache follows data async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): key = f"author:follows-{entity_type}s:{follower_id}" - follows_str = await redis_operation("GET", key) - follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] + follows_str = await redis.execute("GET", key) + follows = orjson.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] if is_insert: if entity_id not in follows: follows.append(entity_id) else: follows = [eid for eid in follows if eid != entity_id] - await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) + await redis.execute("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) await update_follower_stat(follower_id, entity_type, len(follows)) # Update follower statistics async def update_follower_stat(follower_id, entity_type, count): follower_key = f"author:id:{follower_id}" - follower_str = await redis_operation("GET", follower_key) - follower = json.loads(follower_str) if follower_str else None + follower_str = await redis.execute("GET", follower_key) + follower = orjson.loads(follower_str) if follower_str else None if follower: follower["stat"] = {f"{entity_type}s": count} await cache_author(follower) @@ -78,9 +110,9 @@ async def update_follower_stat(follower_id, entity_type, count): # Get author from cache async def get_cached_author(author_id: int, get_with_stat): author_key = f"author:id:{author_id}" - result = await redis_operation("GET", author_key) + result = await redis.execute("GET", author_key) if result: - return json.loads(result) + return orjson.loads(result) # Load from database if not found in cache q = select(Author).where(Author.id == author_id) authors = get_with_stat(q) @@ -103,16 +135,16 @@ async def get_cached_topic(topic_id: int): dict: Topic data or None if not found. """ topic_key = f"topic:id:{topic_id}" - cached_topic = await redis_operation("GET", topic_key) + cached_topic = await redis.execute("GET", topic_key) if cached_topic: - return json.loads(cached_topic) + return orjson.loads(cached_topic) # If not in cache, fetch from the database with local_session() as session: topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() if topic: topic_dict = topic.dict() - await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) + await redis.execute("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) return topic_dict return None @@ -121,9 +153,9 @@ async def get_cached_topic(topic_id: int): # Get topic by slug from cache async def get_cached_topic_by_slug(slug: str, get_with_stat): topic_key = f"topic:slug:{slug}" - result = await redis_operation("GET", topic_key) + result = await redis.execute("GET", topic_key) if result: - return json.loads(result) + return orjson.loads(result) # Load from database if not found in cache topic_query = select(Topic).where(Topic.slug == slug) topics = get_with_stat(topic_query) @@ -138,8 +170,8 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat): async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: # Fetch all author data concurrently keys = [f"author:id:{author_id}" for author_id in author_ids] - results = await asyncio.gather(*(redis_operation("GET", key) for key in keys)) - authors = [json.loads(result) if result else None for result in results] + results = await asyncio.gather(*(redis.execute("GET", key) for key in keys)) + authors = [orjson.loads(result) if result else None for result in results] # Load missing authors from database and cache missing_indices = [index for index, author in enumerate(authors) if author is None] if missing_indices: @@ -165,10 +197,10 @@ async def get_cached_topic_followers(topic_id: int): """ try: cache_key = CACHE_KEYS["TOPIC_FOLLOWERS"].format(topic_id) - cached = await redis_operation("GET", cache_key) + cached = await redis.execute("GET", cache_key) if cached: - followers_ids = json.loads(cached) + followers_ids = orjson.loads(cached) logger.debug(f"Found {len(followers_ids)} cached followers for topic #{topic_id}") return await get_cached_authors_by_ids(followers_ids) @@ -181,7 +213,7 @@ async def get_cached_topic_followers(topic_id: int): .all() ] - await redis_operation("SETEX", cache_key, value=json.dumps(followers_ids), ttl=CACHE_TTL) + await redis.execute("SETEX", cache_key, CACHE_TTL, orjson.dumps(followers_ids)) followers = await get_cached_authors_by_ids(followers_ids) logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}") return followers @@ -194,9 +226,9 @@ async def get_cached_topic_followers(topic_id: int): # Get cached author followers async def get_cached_author_followers(author_id: int): # Check cache for data - cached = await redis_operation("GET", f"author:followers:{author_id}") + cached = await redis.execute("GET", f"author:followers:{author_id}") if cached: - followers_ids = json.loads(cached) + followers_ids = orjson.loads(cached) followers = await get_cached_authors_by_ids(followers_ids) logger.debug(f"Cached followers for author #{author_id}: {len(followers)}") return followers @@ -210,7 +242,7 @@ async def get_cached_author_followers(author_id: int): .filter(AuthorFollower.author == author_id, Author.id != author_id) .all() ] - await redis_operation("SET", f"author:followers:{author_id}", json.dumps(followers_ids)) + await redis.execute("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids)) followers = await get_cached_authors_by_ids(followers_ids) return followers @@ -218,9 +250,9 @@ async def get_cached_author_followers(author_id: int): # Get cached follower authors async def get_cached_follower_authors(author_id: int): # Attempt to retrieve authors from cache - cached = await redis_operation("GET", f"author:follows-authors:{author_id}") + cached = await redis.execute("GET", f"author:follows-authors:{author_id}") if cached: - authors_ids = json.loads(cached) + authors_ids = orjson.loads(cached) else: # Query authors from database with local_session() as session: @@ -232,7 +264,7 @@ async def get_cached_follower_authors(author_id: int): .where(AuthorFollower.follower == author_id) ).all() ] - await redis_operation("SET", f"author:follows-authors:{author_id}", json.dumps(authors_ids)) + await redis.execute("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids)) authors = await get_cached_authors_by_ids(authors_ids) return authors @@ -241,9 +273,9 @@ async def get_cached_follower_authors(author_id: int): # Get cached follower topics async def get_cached_follower_topics(author_id: int): # Attempt to retrieve topics from cache - cached = await redis_operation("GET", f"author:follows-topics:{author_id}") + cached = await redis.execute("GET", f"author:follows-topics:{author_id}") if cached: - topics_ids = json.loads(cached) + topics_ids = orjson.loads(cached) else: # Load topics from database and cache them with local_session() as session: @@ -254,13 +286,13 @@ async def get_cached_follower_topics(author_id: int): .where(TopicFollower.follower == author_id) .all() ] - await redis_operation("SET", f"author:follows-topics:{author_id}", json.dumps(topics_ids)) + await redis.execute("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids)) topics = [] for topic_id in topics_ids: - topic_str = await redis_operation("GET", f"topic:id:{topic_id}") + topic_str = await redis.execute("GET", f"topic:id:{topic_id}") if topic_str: - topic = json.loads(topic_str) + topic = orjson.loads(topic_str) if topic and topic not in topics: topics.append(topic) @@ -280,12 +312,12 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): dict: Dictionary with author data or None if not found. """ # Attempt to find author ID by user_id in Redis cache - author_id = await redis_operation("GET", f"author:user:{user_id.strip()}") + author_id = await redis.execute("GET", f"author:user:{user_id.strip()}") if author_id: # If ID is found, get full author data by ID - author_data = await redis_operation("GET", f"author:id:{author_id}") + author_data = await redis.execute("GET", f"author:id:{author_id}") if author_data: - return json.loads(author_data) + return orjson.loads(author_data) # If data is not found in cache, query the database author_query = select(Author).where(Author.user == user_id) @@ -295,8 +327,8 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat): author = authors[0] author_dict = author.dict() await asyncio.gather( - redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)), - redis_operation("SET", f"author:id:{author.id}", json.dumps(author_dict)), + redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)), + redis.execute("SET", f"author:id:{author.id}", orjson.dumps(author_dict)), ) return author_dict @@ -317,9 +349,9 @@ async def get_cached_topic_authors(topic_id: int): """ # Attempt to get a list of author IDs from cache rkey = f"topic:authors:{topic_id}" - cached_authors_ids = await redis_operation("GET", rkey) + cached_authors_ids = await redis.execute("GET", rkey) if cached_authors_ids: - authors_ids = json.loads(cached_authors_ids) + authors_ids = orjson.loads(cached_authors_ids) else: # If cache is empty, get data from the database with local_session() as session: @@ -331,7 +363,7 @@ async def get_cached_topic_authors(topic_id: int): ) authors_ids = [author_id for (author_id,) in session.execute(query).all()] # Cache the retrieved author IDs - await redis_operation("SET", rkey, json.dumps(authors_ids)) + await redis.execute("SET", rkey, orjson.dumps(authors_ids)) # Retrieve full author details from cached IDs if authors_ids: @@ -352,11 +384,11 @@ async def invalidate_shouts_cache(cache_keys: List[str]): cache_key = f"shouts:{key}" # Удаляем основной кэш - await redis_operation("DEL", cache_key) + await redis.execute("DEL", cache_key) logger.debug(f"Invalidated cache key: {cache_key}") # Добавляем ключ в список инвалидированных с TTL - await redis_operation("SETEX", f"{cache_key}:invalidated", value="1", ttl=CACHE_TTL) + await redis.execute("SETEX", f"{cache_key}:invalidated", CACHE_TTL, "1") # Если это кэш темы, инвалидируем также связанные ключи if key.startswith("topic_"): @@ -368,7 +400,7 @@ async def invalidate_shouts_cache(cache_keys: List[str]): f"topic:stats:{topic_id}", ] for related_key in related_keys: - await redis_operation("DEL", related_key) + await redis.execute("DEL", related_key) logger.debug(f"Invalidated related key: {related_key}") except Exception as e: @@ -379,15 +411,15 @@ async def cache_topic_shouts(topic_id: int, shouts: List[dict]): """Кэширует список публикаций для темы""" key = f"topic_shouts_{topic_id}" payload = json.dumps(shouts, cls=CustomJSONEncoder) - await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL) + await redis.execute("SETEX", key, CACHE_TTL, payload) async def get_cached_topic_shouts(topic_id: int) -> List[dict]: """Получает кэшированный список публикаций для темы""" key = f"topic_shouts_{topic_id}" - cached = await redis_operation("GET", key) + cached = await redis.execute("GET", key) if cached: - return json.loads(cached) + return orjson.loads(cached) return None @@ -431,27 +463,7 @@ async def invalidate_shout_related_cache(shout: Shout, author_id: int): await invalidate_shouts_cache(list(cache_keys)) -async def redis_operation(operation: str, key: str, value=None, ttl=None): - """ - Унифицированная функция для работы с Redis - - Args: - operation: 'GET', 'SET', 'DEL', 'SETEX' - key: ключ - value: значение (для SET/SETEX) - ttl: время жизни в секундах (для SETEX) - """ - try: - if operation == "GET": - return await redis.execute("GET", key) - elif operation == "SET": - await redis.execute("SET", key, value) - elif operation == "SETEX": - await redis.execute("SETEX", key, ttl or CACHE_TTL, value) - elif operation == "DEL": - await redis.execute("DEL", key) - except Exception as e: - logger.error(f"Redis {operation} error for key {key}: {e}") +# Function removed - direct Redis calls used throughout the module instead async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_method): @@ -465,9 +477,9 @@ async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_ cache_method: метод кэширования """ key = f"{entity_type}:id:{entity_id}" - cached = await redis_operation("GET", key) + cached = await redis.execute("GET", key) if cached: - return json.loads(cached) + return orjson.loads(cached) entity = await get_method(entity_id) if entity: @@ -496,3 +508,120 @@ async def cache_by_id(entity, entity_id: int, cache_method): d = x.dict() await cache_method(d) return d + + +# Универсальная функция для сохранения данных в кеш +async def cache_data(key: str, data: Any, ttl: Optional[int] = None) -> None: + """ + Сохраняет данные в кеш по указанному ключу. + + Args: + key: Ключ кеша + data: Данные для сохранения + ttl: Время жизни кеша в секундах (None - бессрочно) + """ + try: + payload = json.dumps(data, cls=CustomJSONEncoder) + if ttl: + await redis.execute("SETEX", key, ttl, payload) + else: + await redis.execute("SET", key, payload) + logger.debug(f"Данные сохранены в кеш по ключу {key}") + except Exception as e: + logger.error(f"Ошибка при сохранении данных в кеш: {e}") + + +# Универсальная функция для получения данных из кеша +async def get_cached_data(key: str) -> Optional[Any]: + """ + Получает данные из кеша по указанному ключу. + + Args: + key: Ключ кеша + + Returns: + Any: Данные из кеша или None, если данных нет + """ + try: + cached_data = await redis.execute("GET", key) + if cached_data: + logger.debug(f"Данные получены из кеша по ключу {key}") + return orjson.loads(cached_data) + return None + except Exception as e: + logger.error(f"Ошибка при получении данных из кеша: {e}") + return None + + +# Универсальная функция для инвалидации кеша по префиксу +async def invalidate_cache_by_prefix(prefix: str) -> None: + """ + Инвалидирует все ключи кеша с указанным префиксом. + + Args: + prefix: Префикс ключей кеша для инвалидации + """ + try: + keys = await redis.execute("KEYS", f"{prefix}:*") + if keys: + await redis.execute("DEL", *keys) + logger.debug(f"Удалено {len(keys)} ключей кеша с префиксом {prefix}") + except Exception as e: + logger.error(f"Ошибка при инвалидации кеша: {e}") + + +# Универсальная функция для получения и кеширования данных +async def cached_query( + cache_key: str, + query_func: callable, + ttl: Optional[int] = None, + force_refresh: bool = False, + use_key_format: bool = True, + **query_params, +) -> Any: + """ + Gets data from cache or executes query and saves result to cache. + Supports existing key formats for compatibility. + + Args: + cache_key: Cache key or key template from CACHE_KEYS + query_func: Function to execute the query + ttl: Cache TTL in seconds (None - indefinite) + force_refresh: Force cache refresh + use_key_format: Whether to check if cache_key matches a key template in CACHE_KEYS + **query_params: Parameters to pass to the query function + + Returns: + Any: Data from cache or query result + """ + # Check if cache_key matches a pattern in CACHE_KEYS + actual_key = cache_key + if use_key_format and "{}" in cache_key: + # Look for a template match in CACHE_KEYS + for key_name, key_format in CACHE_KEYS.items(): + if cache_key == key_format: + # We have a match, now look for the id or value to format with + for param_name, param_value in query_params.items(): + if param_name in ["id", "slug", "user", "topic_id", "author_id"]: + actual_key = cache_key.format(param_value) + break + + # If not forcing refresh, try to get data from cache + if not force_refresh: + cached_result = await get_cached_data(actual_key) + if cached_result is not None: + return cached_result + + # If data not in cache or refresh required, execute query + try: + result = await query_func(**query_params) + if result is not None: + # Save result to cache + await cache_data(actual_key, result, ttl) + return result + except Exception as e: + logger.error(f"Error executing query for caching: {e}") + # In case of error, return data from cache if not forcing refresh + if not force_refresh: + return await get_cached_data(actual_key) + raise diff --git a/cache/memorycache.py b/cache/memorycache.py deleted file mode 100644 index 003c863d..00000000 --- a/cache/memorycache.py +++ /dev/null @@ -1,11 +0,0 @@ -from dogpile.cache import make_region - -from settings import REDIS_URL - -# Создание региона кэша с TTL -cache_region = make_region() -cache_region.configure( - "dogpile.cache.redis", - arguments={"url": f"{REDIS_URL}/1"}, - expiration_time=3600, # Cache expiration time in seconds -) diff --git a/cache/precache.py b/cache/precache.py index 94b39960..23844024 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -86,11 +86,15 @@ async def precache_data(): # Преобразуем словарь в список аргументов для HSET if value: - flattened = [] - for field, val in value.items(): - flattened.extend([field, val]) - - await redis.execute("HSET", key, *flattened) + # Если значение - словарь, преобразуем его в плоский список для HSET + if isinstance(value, dict): + flattened = [] + for field, val in value.items(): + flattened.extend([field, val]) + await redis.execute("HSET", key, *flattened) + else: + # Предполагаем, что значение уже содержит список + await redis.execute("HSET", key, *value) logger.info(f"redis hash '{key}' was restored") with local_session() as session: diff --git a/cache/revalidator.py b/cache/revalidator.py index 125b9f5f..7be5041c 100644 --- a/cache/revalidator.py +++ b/cache/revalidator.py @@ -1,17 +1,26 @@ import asyncio -from cache.cache import cache_author, cache_topic, get_cached_author, get_cached_topic +from cache.cache import ( + cache_author, + cache_topic, + get_cached_author, + get_cached_topic, + invalidate_cache_by_prefix, +) from resolvers.stat import get_with_stat from utils.logger import root_logger as logger +CACHE_REVALIDATION_INTERVAL = 300 # 5 minutes + class CacheRevalidationManager: - def __init__(self, interval=60): + def __init__(self, interval=CACHE_REVALIDATION_INTERVAL): """Инициализация менеджера с заданным интервалом проверки (в секундах).""" self.interval = interval self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()} self.lock = asyncio.Lock() self.running = True + self.MAX_BATCH_SIZE = 10 # Максимальное количество элементов для поштучной обработки async def start(self): """Запуск фонового воркера для ревалидации кэша.""" @@ -32,22 +41,107 @@ class CacheRevalidationManager: """Обновление кэша для всех сущностей, требующих ревалидации.""" async with self.lock: # Ревалидация кэша авторов - for author_id in self.items_to_revalidate["authors"]: - author = await get_cached_author(author_id, get_with_stat) - if author: - await cache_author(author) - self.items_to_revalidate["authors"].clear() + if self.items_to_revalidate["authors"]: + logger.debug(f"Revalidating {len(self.items_to_revalidate['authors'])} authors") + for author_id in self.items_to_revalidate["authors"]: + if author_id == "all": + await invalidate_cache_by_prefix("authors") + break + author = await get_cached_author(author_id, get_with_stat) + if author: + await cache_author(author) + self.items_to_revalidate["authors"].clear() # Ревалидация кэша тем - for topic_id in self.items_to_revalidate["topics"]: - topic = await get_cached_topic(topic_id) - if topic: - await cache_topic(topic) - self.items_to_revalidate["topics"].clear() + if self.items_to_revalidate["topics"]: + logger.debug(f"Revalidating {len(self.items_to_revalidate['topics'])} topics") + for topic_id in self.items_to_revalidate["topics"]: + if topic_id == "all": + await invalidate_cache_by_prefix("topics") + break + topic = await get_cached_topic(topic_id) + if topic: + await cache_topic(topic) + self.items_to_revalidate["topics"].clear() + + # Ревалидация шаутов (публикаций) + if self.items_to_revalidate["shouts"]: + shouts_count = len(self.items_to_revalidate["shouts"]) + logger.debug(f"Revalidating {shouts_count} shouts") + + # Проверяем наличие специального флага 'all' + if "all" in self.items_to_revalidate["shouts"]: + await invalidate_cache_by_prefix("shouts") + # Если элементов много, но не 'all', используем специфический подход + elif shouts_count > self.MAX_BATCH_SIZE: + # Инвалидируем только collections keys, которые затрагивают много сущностей + collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "shouts:*")) + if collection_keys: + await self._redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей шаутов") + + # Обновляем кеш каждого конкретного шаута + for shout_id in self.items_to_revalidate["shouts"]: + if shout_id != "all": + # Точечная инвалидация для каждого shout_id + specific_keys = [f"shout:id:{shout_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + else: + # Если элементов немного, обрабатываем каждый + for shout_id in self.items_to_revalidate["shouts"]: + if shout_id != "all": + # Точечная инвалидация для каждого shout_id + specific_keys = [f"shout:id:{shout_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + + self.items_to_revalidate["shouts"].clear() + + # Аналогично для реакций - точечная инвалидация + if self.items_to_revalidate["reactions"]: + reactions_count = len(self.items_to_revalidate["reactions"]) + logger.debug(f"Revalidating {reactions_count} reactions") + + if "all" in self.items_to_revalidate["reactions"]: + await invalidate_cache_by_prefix("reactions") + elif reactions_count > self.MAX_BATCH_SIZE: + # Инвалидируем только collections keys для реакций + collection_keys = await asyncio.create_task(self._redis.execute("KEYS", "reactions:*")) + if collection_keys: + await self._redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей реакций") + + # Точечная инвалидация для каждой реакции + for reaction_id in self.items_to_revalidate["reactions"]: + if reaction_id != "all": + specific_keys = [f"reaction:id:{reaction_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + else: + # Точечная инвалидация для каждой реакции + for reaction_id in self.items_to_revalidate["reactions"]: + if reaction_id != "all": + specific_keys = [f"reaction:id:{reaction_id}"] + for key in specific_keys: + await self._redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + + self.items_to_revalidate["reactions"].clear() def mark_for_revalidation(self, entity_id, entity_type): """Отметить сущность для ревалидации.""" - self.items_to_revalidate[entity_type].add(entity_id) + if entity_id and entity_type: + self.items_to_revalidate[entity_type].add(entity_id) + + def invalidate_all(self, entity_type): + """Пометить для инвалидации все элементы указанного типа.""" + logger.debug(f"Marking all {entity_type} for invalidation") + # Особый флаг для полной инвалидации + self.items_to_revalidate[entity_type].add("all") async def stop(self): """Остановка фонового воркера.""" @@ -60,4 +154,4 @@ class CacheRevalidationManager: pass -revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут +revalidation_manager = CacheRevalidationManager() diff --git a/docs/caching.md b/docs/caching.md new file mode 100644 index 00000000..7c025be2 --- /dev/null +++ b/docs/caching.md @@ -0,0 +1,279 @@ +# Система кеширования Discours + +## Общее описание + +Система кеширования Discours - это комплексное решение для повышения производительности платформы. Она использует Redis для хранения часто запрашиваемых данных и уменьшения нагрузки на основную базу данных. + +Кеширование реализовано как многоуровневая система, состоящая из нескольких модулей: + +- `cache.py` - основной модуль с функциями кеширования +- `revalidator.py` - асинхронный менеджер ревалидации кеша +- `triggers.py` - триггеры событий SQLAlchemy для автоматической ревалидации +- `precache.py` - предварительное кеширование данных при старте приложения + +## Ключевые компоненты + +### 1. Форматы ключей кеша + +Система поддерживает несколько форматов ключей для обеспечения совместимости и удобства использования: + +- **Ключи сущностей**: `entity:property:value` (например, `author:id:123`) +- **Ключи коллекций**: `entity:collection:params` (например, `authors:stats:limit=10:offset=0`) +- **Специальные ключи**: для обратной совместимости (например, `topic_shouts_123`) + +Все стандартные форматы ключей хранятся в словаре `CACHE_KEYS`: + +```python +CACHE_KEYS = { + "TOPIC_ID": "topic:id:{}", + "TOPIC_SLUG": "topic:slug:{}", + "AUTHOR_ID": "author:id:{}", + # и другие... +} +``` + +### 2. Основные функции кеширования + +#### Структура ключей + +Вместо генерации ключей через вспомогательные функции, система следует строгим конвенциям формирования ключей: + +1. **Ключи для отдельных сущностей** строятся по шаблону: + ``` + entity:property:value + ``` + Например: + - `topic:id:123` - тема с ID 123 + - `author:slug:john-doe` - автор со слагом "john-doe" + - `shout:id:456` - публикация с ID 456 + +2. **Ключи для коллекций** строятся по шаблону: + ``` + entity:collection[:filter1=value1:filter2=value2:...] + ``` + Например: + - `topics:all:basic` - базовый список всех тем + - `authors:stats:limit=10:offset=0:sort=name` - отсортированный список авторов с пагинацией + - `shouts:feed:limit=20:community=1` - лента публикаций с фильтром по сообществу + +3. **Специальные форматы ключей** для обратной совместимости: + ``` + entity_action_id + ``` + Например: + - `topic_shouts_123` - публикации для темы с ID 123 + +Во всех модулях системы разработчики должны явно формировать ключи в соответствии с этими конвенциями, что обеспечивает единообразие и предсказуемость кеширования. + +#### Работа с данными в кеше + +```python +async def cache_data(key, data, ttl=None) +async def get_cached_data(key) +``` + +Эти функции предоставляют универсальный интерфейс для сохранения и получения данных из кеша. Они напрямую используют Redis через вызовы `redis.execute()`. + +#### Высокоуровневое кеширование запросов + +```python +async def cached_query(cache_key, query_func, ttl=None, force_refresh=False, **query_params) +``` + +Функция `cached_query` объединяет получение данных из кеша и выполнение запроса в случае отсутствия данных в кеше. Это основная функция, которую следует использовать в резолверах для кеширования результатов запросов. + +### 3. Кеширование сущностей + +Для основных типов сущностей реализованы специальные функции: + +```python +async def cache_topic(topic: dict) +async def cache_author(author: dict) +async def get_cached_topic(topic_id: int) +async def get_cached_author(author_id: int, get_with_stat) +``` + +Эти функции упрощают работу с часто используемыми типами данных и обеспечивают единообразный подход к их кешированию. + +### 4. Работа со связями + +Для работы со связями между сущностями предназначены функции: + +```python +async def cache_follows(follower_id, entity_type, entity_id, is_insert=True) +async def get_cached_topic_followers(topic_id) +async def get_cached_author_followers(author_id) +async def get_cached_follower_topics(author_id) +``` + +Они позволяют эффективно кешировать и получать информацию о подписках, связях между авторами, темами и публикациями. + +## Система инвалидации кеша + +### 1. Прямая инвалидация + +Система поддерживает два типа инвалидации кеша: + +#### 1.1. Инвалидация по префиксу + +```python +async def invalidate_cache_by_prefix(prefix) +``` + +Позволяет инвалидировать все ключи кеша, начинающиеся с указанного префикса. Используется в резолверах для инвалидации группы кешей при массовых изменениях. + +#### 1.2. Точечная инвалидация + +```python +async def invalidate_authors_cache(author_id=None) +async def invalidate_topics_cache(topic_id=None) +``` + +Эти функции позволяют инвалидировать кеш только для конкретной сущности, что снижает нагрузку на Redis и предотвращает ненужную потерю кешированных данных. Если ID сущности не указан, используется инвалидация по префиксу. + +Примеры использования точечной инвалидации: + +```python +# Инвалидация кеша только для автора с ID 123 +await invalidate_authors_cache(123) + +# Инвалидация кеша только для темы с ID 456 +await invalidate_topics_cache(456) +``` + +### 2. Отложенная инвалидация + +Модуль `revalidator.py` реализует систему отложенной инвалидации кеша через класс `CacheRevalidationManager`: + +```python +class CacheRevalidationManager: + # ... + async def process_revalidation(self): + # ... + def mark_for_revalidation(self, entity_id, entity_type): + # ... +``` + +Менеджер ревалидации работает как асинхронный фоновый процесс, который периодически (по умолчанию каждые 5 минут) проверяет наличие сущностей для ревалидации. + +Особенности реализации: +- Для авторов и тем используется поштучная ревалидация каждой записи +- Для шаутов и реакций используется батчевая обработка, с порогом в 10 элементов +- При достижении порога система переключается на инвалидацию коллекций вместо поштучной обработки +- Специальный флаг `all` позволяет запустить полную инвалидацию всех записей типа + +### 3. Автоматическая инвалидация через триггеры + +Модуль `triggers.py` регистрирует обработчики событий SQLAlchemy, которые автоматически отмечают сущности для ревалидации при изменении данных в базе: + +```python +def events_register(): + event.listen(Author, "after_update", mark_for_revalidation) + event.listen(Topic, "after_update", mark_for_revalidation) + # и другие... +``` + +Триггеры имеют следующие особенности: +- Реагируют на события вставки, обновления и удаления +- Отмечают затронутые сущности для отложенной ревалидации +- Учитывают связи между сущностями (например, при изменении темы обновляются связанные шауты) + +## Предварительное кеширование + +Модуль `precache.py` реализует предварительное кеширование часто используемых данных при старте приложения: + +```python +async def precache_data(): + # ... +``` + +Эта функция выполняется при запуске приложения и заполняет кеш данными, которые будут часто запрашиваться пользователями. + +## Примеры использования + +### Простое кеширование результата запроса + +```python +async def get_topics_with_stats(limit=10, offset=0, by="title"): + # Формирование ключа кеша по конвенции + cache_key = f"topics:stats:limit={limit}:offset={offset}:sort={by}" + + cached_data = await get_cached_data(cache_key) + if cached_data: + return cached_data + + # Выполнение запроса к базе данных + result = ... # логика получения данных + + await cache_data(cache_key, result, ttl=300) + return result +``` + +### Использование обобщенной функции cached_query + +```python +async def get_topics_with_stats(limit=10, offset=0, by="title"): + async def fetch_data(limit, offset, by): + # Логика получения данных + return result + + # Формирование ключа кеша по конвенции + cache_key = f"topics:stats:limit={limit}:offset={offset}:sort={by}" + + return await cached_query( + cache_key, + fetch_data, + ttl=300, + limit=limit, + offset=offset, + by=by + ) +``` + +### Точечная инвалидация кеша при изменении данных + +```python +async def update_topic(topic_id, new_data): + # Обновление данных в базе + # ... + + # Точечная инвалидация кеша только для измененной темы + await invalidate_topics_cache(topic_id) + + return updated_topic +``` + +## Отладка и мониторинг + +Система кеширования использует логгер для отслеживания операций: + +```python +logger.debug(f"Данные получены из кеша по ключу {key}") +logger.debug(f"Удалено {len(keys)} ключей кеша с префиксом {prefix}") +logger.error(f"Ошибка при инвалидации кеша: {e}") +``` + +Это позволяет отслеживать работу кеша и выявлять возможные проблемы на ранних стадиях. + +## Рекомендации по использованию + +1. **Следуйте конвенциям формирования ключей** - это критически важно для консистентности и предсказуемости кеша. +2. **Не создавайте собственные форматы ключей** - используйте существующие шаблоны для обеспечения единообразия. +3. **Не забывайте об инвалидации** - всегда инвалидируйте кеш при изменении данных. +4. **Используйте точечную инвалидацию** - вместо инвалидации по префиксу для снижения нагрузки на Redis. +5. **Устанавливайте разумные TTL** - используйте разные значения TTL в зависимости от частоты изменения данных. +6. **Не кешируйте большие объемы данных** - кешируйте только то, что действительно необходимо для повышения производительности. + +## Технические детали реализации + +- **Сериализация данных**: используется `orjson` для эффективной сериализации и десериализации данных. +- **Форматирование даты и времени**: для корректной работы с датами используется `CustomJSONEncoder`. +- **Асинхронность**: все операции кеширования выполняются асинхронно для минимального влияния на производительность API. +- **Прямое взаимодействие с Redis**: все операции выполняются через прямые вызовы `redis.execute()` с обработкой ошибок. +- **Батчевая обработка**: для массовых операций используется пороговое значение, после которого применяются оптимизированные стратегии. + +## Известные ограничения + +1. **Согласованность данных** - система не гарантирует абсолютную согласованность данных в кеше и базе данных. +2. **Память** - необходимо следить за объемом данных в кеше, чтобы избежать проблем с памятью Redis. +3. **Производительность Redis** - при большом количестве операций с кешем может стать узким местом. diff --git a/docs/comments-pagination.md b/docs/comments-pagination.md new file mode 100644 index 00000000..0f8f7261 --- /dev/null +++ b/docs/comments-pagination.md @@ -0,0 +1,165 @@ +# Пагинация комментариев + +## Обзор + +Реализована система пагинации комментариев по веткам, которая позволяет эффективно загружать и отображать вложенные ветки обсуждений. Основные преимущества: + +1. Загрузка только необходимых комментариев, а не всего дерева +2. Снижение нагрузки на сервер и клиент +3. Возможность эффективной навигации по большим обсуждениям +4. Предзагрузка первых N ответов для улучшения UX + +## API для иерархической загрузки комментариев + +### GraphQL запрос `load_comments_branch` + +```graphql +query LoadCommentsBranch( + $shout: Int!, + $parentId: Int, + $limit: Int, + $offset: Int, + $sort: ReactionSort, + $childrenLimit: Int, + $childrenOffset: Int +) { + load_comments_branch( + shout: $shout, + parent_id: $parentId, + limit: $limit, + offset: $offset, + sort: $sort, + children_limit: $childrenLimit, + children_offset: $childrenOffset + ) { + id + body + created_at + created_by { + id + name + slug + pic + } + kind + reply_to + stat { + rating + comments_count + } + first_replies { + id + body + created_at + created_by { + id + name + slug + pic + } + kind + reply_to + stat { + rating + comments_count + } + } + } +} +``` + +### Параметры запроса + +| Параметр | Тип | По умолчанию | Описание | +|----------|-----|--------------|----------| +| shout | Int! | - | ID статьи, к которой относятся комментарии | +| parent_id | Int | null | ID родительского комментария. Если null, загружаются корневые комментарии | +| limit | Int | 10 | Максимальное количество комментариев для загрузки | +| offset | Int | 0 | Смещение для пагинации | +| sort | ReactionSort | newest | Порядок сортировки: newest, oldest, like | +| children_limit | Int | 3 | Максимальное количество дочерних комментариев для каждого родительского | +| children_offset | Int | 0 | Смещение для пагинации дочерних комментариев | + +### Поля в ответе + +Каждый комментарий содержит следующие основные поля: + +- `id`: ID комментария +- `body`: Текст комментария +- `created_at`: Время создания +- `created_by`: Информация об авторе +- `kind`: Тип реакции (COMMENT) +- `reply_to`: ID родительского комментария (null для корневых) +- `first_replies`: Первые N дочерних комментариев +- `stat`: Статистика комментария, включающая: + - `comments_count`: Количество ответов на комментарий + - `rating`: Рейтинг комментария + +## Примеры использования + +### Загрузка корневых комментариев с первыми ответами + +```javascript +const { data } = await client.query({ + query: LOAD_COMMENTS_BRANCH, + variables: { + shout: 222, + limit: 10, + offset: 0, + sort: "newest", + childrenLimit: 3 + } +}); +``` + +### Загрузка ответов на конкретный комментарий + +```javascript +const { data } = await client.query({ + query: LOAD_COMMENTS_BRANCH, + variables: { + shout: 222, + parentId: 123, // ID комментария, для которого загружаем ответы + limit: 10, + offset: 0, + sort: "oldest" // Сортируем ответы от старых к новым + } +}); +``` + +### Пагинация дочерних комментариев + +Для загрузки дополнительных ответов на комментарий: + +```javascript +const { data } = await client.query({ + query: LOAD_COMMENTS_BRANCH, + variables: { + shout: 222, + parentId: 123, + limit: 10, + offset: 0, + childrenLimit: 5, + childrenOffset: 3 // Пропускаем первые 3 комментария (уже загруженные) + } +}); +``` + +## Рекомендации по клиентской реализации + +1. Для эффективной работы со сложными ветками обсуждений рекомендуется: + + - Сначала загружать только корневые комментарии с первыми N ответами + - При наличии дополнительных ответов (когда `stat.comments_count > first_replies.length`) + добавить кнопку "Показать все ответы" + - При нажатии на кнопку загружать дополнительные ответы с помощью запроса с указанным `parentId` + +2. Для сортировки: + - По умолчанию использовать `newest` для отображения свежих обсуждений + - Предусмотреть переключатель сортировки для всего дерева комментариев + - При изменении сортировки перезагружать данные с новым параметром `sort` + +3. Для улучшения производительности: + - Кешировать результаты запросов на клиенте + - Использовать оптимистичные обновления при добавлении/редактировании комментариев + - При необходимости загружать комментарии порциями (ленивая загрузка) \ No newline at end of file diff --git a/docs/features.md b/docs/features.md index 1970dbdb..4837f870 100644 --- a/docs/features.md +++ b/docs/features.md @@ -6,14 +6,20 @@ ## Мультидоменная авторизация -- Поддержка авторизации для разных доменов: - - *.dscrs.site (включая testing.dscrs.site) - - localhost[:port] - - testingdiscoursio-git-*-discoursio.vercel.app - - *.discours.io +- Поддержка авторизации для разных доменов - Автоматическое определение сервера авторизации - Корректная обработка CORS для всех поддерживаемых доменов +## Система кеширования + +- Redis используется в качестве основного механизма кеширования +- Поддержка как синхронных, так и асинхронных функций в декораторе cache_on_arguments +- Автоматическая сериализация/десериализация данных в JSON с использованием CustomJSONEncoder +- Резервная сериализация через pickle для сложных объектов +- Генерация уникальных ключей кеша на основе сигнатуры функции и переданных аргументов +- Настраиваемое время жизни кеша (TTL) +- Возможность ручной инвалидации кеша для конкретных функций и аргументов + ## Webhooks - Автоматическая регистрация вебхука для события user.login @@ -25,11 +31,18 @@ ## CORS Configuration -- Поддерживаются домены: - - *.dscrs.site (включая testing.dscrs.site, core.dscrs.site) - - *.discours.io (включая testing.discours.io) - - localhost (включая порты) - Поддерживаемые методы: GET, POST, OPTIONS - Настроена поддержка credentials - Разрешенные заголовки: Authorization, Content-Type, X-Requested-With, DNT, Cache-Control -- Настроено кэширование preflight-ответов на 20 дней (1728000 секунд) \ No newline at end of file +- Настроено кэширование preflight-ответов на 20 дней (1728000 секунд) + +## Пагинация комментариев по веткам + +- Эффективная загрузка комментариев с учетом их иерархической структуры +- Отдельный запрос `load_comments_branch` для оптимизированной загрузки ветки комментариев +- Возможность загрузки корневых комментариев статьи с первыми ответами на них +- Гибкая пагинация как для корневых, так и для дочерних комментариев +- Использование поля `stat.comments_count` для отображения количества ответов на комментарий +- Добавление специального поля `first_replies` для хранения первых ответов на комментарий +- Поддержка различных методов сортировки (новые, старые, популярные) +- Оптимизированные SQL запросы для минимизации нагрузки на базу данных \ No newline at end of file diff --git a/orm/author.py b/orm/author.py index c83b9d4f..4be2c630 100644 --- a/orm/author.py +++ b/orm/author.py @@ -1,6 +1,6 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from services.db import Base @@ -8,6 +8,15 @@ from services.db import Base class AuthorRating(Base): + """ + Рейтинг автора от другого автора. + + Attributes: + rater (int): ID оценивающего автора + author (int): ID оцениваемого автора + plus (bool): Положительная/отрицательная оценка + """ + __tablename__ = "author_rating" id = None # type: ignore @@ -15,8 +24,26 @@ class AuthorRating(Base): author = Column(ForeignKey("author.id"), primary_key=True) plus = Column(Boolean) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех оценок конкретного автора + Index("idx_author_rating_author", "author"), + # Индекс для быстрого поиска всех оценок, оставленных конкретным автором + Index("idx_author_rating_rater", "rater"), + ) + class AuthorFollower(Base): + """ + Подписка одного автора на другого. + + Attributes: + follower (int): ID подписчика + author (int): ID автора, на которого подписываются + created_at (int): Время создания подписки + auto (bool): Признак автоматической подписки + """ + __tablename__ = "author_follower" id = None # type: ignore @@ -25,16 +52,57 @@ class AuthorFollower(Base): created_at = Column(Integer, nullable=False, default=lambda: int(time.time())) auto = Column(Boolean, nullable=False, default=False) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех подписчиков автора + Index("idx_author_follower_author", "author"), + # Индекс для быстрого поиска всех авторов, на которых подписан конкретный автор + Index("idx_author_follower_follower", "follower"), + ) + class AuthorBookmark(Base): + """ + Закладка автора на публикацию. + + Attributes: + author (int): ID автора + shout (int): ID публикации + """ + __tablename__ = "author_bookmark" id = None # type: ignore author = Column(ForeignKey("author.id"), primary_key=True) shout = Column(ForeignKey("shout.id"), primary_key=True) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех закладок автора + Index("idx_author_bookmark_author", "author"), + # Индекс для быстрого поиска всех авторов, добавивших публикацию в закладки + Index("idx_author_bookmark_shout", "shout"), + ) + class Author(Base): + """ + Модель автора в системе. + + Attributes: + user (str): Идентификатор пользователя в системе авторизации + name (str): Отображаемое имя + slug (str): Уникальный строковый идентификатор + bio (str): Краткая биография/статус + about (str): Полное описание + pic (str): URL изображения профиля + links (dict): Ссылки на социальные сети и сайты + created_at (int): Время создания профиля + last_seen (int): Время последнего посещения + updated_at (int): Время последнего обновления + deleted_at (int): Время удаления (если профиль удален) + """ + __tablename__ = "author" user = Column(String) # unbounded link with authorizer's User type @@ -53,3 +121,17 @@ class Author(Base): # search_vector = Column( # TSVectorType("name", "slug", "bio", "about", regconfig="pg_catalog.russian") # ) + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска по slug + Index("idx_author_slug", "slug"), + # Индекс для быстрого поиска по идентификатору пользователя + Index("idx_author_user", "user"), + # Индекс для фильтрации неудаленных авторов + Index("idx_author_deleted_at", "deleted_at", postgresql_where=deleted_at.is_(None)), + # Индекс для сортировки по времени создания (для новых авторов) + Index("idx_author_created_at", "created_at"), + # Индекс для сортировки по времени последнего посещения + Index("idx_author_last_seen", "last_seen"), + ) diff --git a/orm/shout.py b/orm/shout.py index db352441..37734aca 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -1,6 +1,6 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from sqlalchemy.orm import relationship from orm.author import Author @@ -10,6 +10,15 @@ from services.db import Base class ShoutTopic(Base): + """ + Связь между публикацией и темой. + + Attributes: + shout (int): ID публикации + topic (int): ID темы + main (bool): Признак основной темы + """ + __tablename__ = "shout_topic" id = None # type: ignore @@ -17,6 +26,12 @@ class ShoutTopic(Base): topic = Column(ForeignKey("topic.id"), primary_key=True, index=True) main = Column(Boolean, nullable=True) + # Определяем дополнительные индексы + __table_args__ = ( + # Оптимизированный составной индекс для запросов, которые ищут публикации по теме + Index("idx_shout_topic_topic_shout", "topic", "shout"), + ) + class ShoutReactionsFollower(Base): __tablename__ = "shout_reactions_followers" @@ -30,6 +45,15 @@ class ShoutReactionsFollower(Base): class ShoutAuthor(Base): + """ + Связь между публикацией и автором. + + Attributes: + shout (int): ID публикации + author (int): ID автора + caption (str): Подпись автора + """ + __tablename__ = "shout_author" id = None # type: ignore @@ -37,8 +61,18 @@ class ShoutAuthor(Base): author = Column(ForeignKey("author.id"), primary_key=True, index=True) caption = Column(String, nullable=True, default="") + # Определяем дополнительные индексы + __table_args__ = ( + # Оптимизированный индекс для запросов, которые ищут публикации по автору + Index("idx_shout_author_author_shout", "author", "shout"), + ) + class Shout(Base): + """ + Публикация в системе. + """ + __tablename__ = "shout" created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time())) @@ -74,3 +108,20 @@ class Shout(Base): seo: str | None = Column(String, nullable=True) # JSON draft: int | None = Column(ForeignKey("draft.id"), nullable=True) + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска неудаленных публикаций + Index("idx_shout_deleted_at", "deleted_at", postgresql_where=deleted_at.is_(None)), + # Индекс для быстрой фильтрации по community + Index("idx_shout_community", "community"), + # Индекс для быстрого поиска по slug + Index("idx_shout_slug", "slug"), + # Составной индекс для фильтрации опубликованных неудаленных публикаций + Index( + "idx_shout_published_deleted", + "published_at", + "deleted_at", + postgresql_where=published_at.is_not(None) & deleted_at.is_(None), + ), + ) diff --git a/orm/topic.py b/orm/topic.py index 61231fb3..4be1897d 100644 --- a/orm/topic.py +++ b/orm/topic.py @@ -1,11 +1,21 @@ import time -from sqlalchemy import JSON, Boolean, Column, ForeignKey, Integer, String +from sqlalchemy import JSON, Boolean, Column, ForeignKey, Index, Integer, String from services.db import Base class TopicFollower(Base): + """ + Связь между топиком и его подписчиком. + + Attributes: + follower (int): ID подписчика + topic (int): ID топика + created_at (int): Время создания связи + auto (bool): Автоматическая подписка + """ + __tablename__ = "topic_followers" id = None # type: ignore @@ -14,8 +24,29 @@ class TopicFollower(Base): created_at = Column(Integer, nullable=False, default=int(time.time())) auto = Column(Boolean, nullable=False, default=False) + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска всех подписчиков топика + Index("idx_topic_followers_topic", "topic"), + # Индекс для быстрого поиска всех топиков, на которые подписан автор + Index("idx_topic_followers_follower", "follower"), + ) + class Topic(Base): + """ + Модель топика (темы) публикаций. + + Attributes: + slug (str): Уникальный строковый идентификатор темы + title (str): Название темы + body (str): Описание темы + pic (str): URL изображения темы + community (int): ID сообщества + oid (str): Старый ID + parent_ids (list): IDs родительских тем + """ + __tablename__ = "topic" slug = Column(String, unique=True) @@ -24,5 +55,12 @@ class Topic(Base): pic = Column(String, nullable=True, comment="Picture") community = Column(ForeignKey("community.id"), default=1) oid = Column(String, nullable=True, comment="Old ID") - parent_ids = Column(JSON, nullable=True, comment="Parent Topic IDs") + + # Определяем индексы + __table_args__ = ( + # Индекс для быстрого поиска по slug + Index("idx_topic_slug", "slug"), + # Индекс для быстрого поиска по сообществу + Index("idx_topic_community", "community"), + ) diff --git a/requirements.dev.txt b/requirements.dev.txt new file mode 100644 index 00000000..fe95b9fb --- /dev/null +++ b/requirements.dev.txt @@ -0,0 +1,6 @@ +fakeredis +pytest +pytest-asyncio +pytest-cov +mypy +ruff diff --git a/requirements.txt b/requirements.txt index 40212b99..0f4d2f7f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,13 +2,10 @@ bcrypt authlib passlib - -google-analytics-data -dogpile-cache opensearch-py +google-analytics-data colorlog psycopg2-binary -dogpile-cache httpx redis[hiredis] sentry-sdk[starlette,sqlalchemy] @@ -20,10 +17,5 @@ granian # NLP and search httpx -pydantic -fakeredis -pytest -pytest-asyncio -pytest-cov -mypy -ruff +orjson +pydantic \ No newline at end of file diff --git a/resolvers/__init__.py b/resolvers/__init__.py index 4d2f8d69..699bc4c4 100644 --- a/resolvers/__init__.py +++ b/resolvers/__init__.py @@ -37,6 +37,7 @@ from resolvers.reaction import ( create_reaction, delete_reaction, load_comment_ratings, + load_comments_branch, load_reactions_by, load_shout_comments, load_shout_ratings, @@ -107,6 +108,7 @@ __all__ = [ "load_shout_comments", "load_shout_ratings", "load_comment_ratings", + "load_comments_branch", # notifier "load_notifications", "notifications_seen_thread", diff --git a/resolvers/author.py b/resolvers/author.py index 604f2bc6..91bad2e5 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,25 +1,196 @@ import asyncio import time +from typing import Optional -from sqlalchemy import desc, select, text +from sqlalchemy import select, text from cache.cache import ( cache_author, + cached_query, get_cached_author, get_cached_author_by_user_id, get_cached_author_followers, get_cached_follower_authors, get_cached_follower_topics, + invalidate_cache_by_prefix, ) from orm.author import Author -from orm.shout import ShoutAuthor, ShoutTopic -from orm.topic import Topic from resolvers.stat import get_with_stat from services.auth import login_required from services.db import local_session +from services.redis import redis from services.schema import mutation, query from utils.logger import root_logger as logger +DEFAULT_COMMUNITIES = [1] + + +# Вспомогательная функция для получения всех авторов без статистики +async def get_all_authors(): + """ + Получает всех авторов без статистики. + Используется для случаев, когда нужен полный список авторов без дополнительной информации. + + Returns: + list: Список всех авторов без статистики + """ + cache_key = "authors:all:basic" + + # Функция для получения всех авторов из БД + async def fetch_all_authors(): + logger.debug("Получаем список всех авторов из БД и кешируем результат") + + with local_session() as session: + # Запрос на получение базовой информации об авторах + authors_query = select(Author).where(Author.deleted_at.is_(None)) + authors = session.execute(authors_query).scalars().all() + + # Преобразуем авторов в словари + return [author.dict() for author in authors] + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_all_authors) + + +# Вспомогательная функция для получения авторов со статистикой с пагинацией +async def get_authors_with_stats(limit=50, offset=0, by: Optional[str] = None): + """ + Получает авторов со статистикой с пагинацией. + + Args: + limit: Максимальное количество возвращаемых авторов + offset: Смещение для пагинации + by: Опциональный параметр сортировки (new/active) + + Returns: + list: Список авторов с их статистикой + """ + # Формируем ключ кеша с помощью универсальной функции + cache_key = f"authors:stats:limit={limit}:offset={offset}" + + # Функция для получения авторов из БД + async def fetch_authors_with_stats(): + logger.debug(f"Выполняем запрос на получение авторов со статистикой: limit={limit}, offset={offset}, by={by}") + + with local_session() as session: + # Базовый запрос для получения авторов + base_query = select(Author).where(Author.deleted_at.is_(None)) + + # Применяем сортировку + if by: + if isinstance(by, dict): + # Обработка словаря параметров сортировки + from sqlalchemy import asc, desc + + for field, direction in by.items(): + column = getattr(Author, field, None) + if column: + if direction.lower() == "desc": + base_query = base_query.order_by(desc(column)) + else: + base_query = base_query.order_by(column) + elif by == "new": + base_query = base_query.order_by(desc(Author.created_at)) + elif by == "active": + base_query = base_query.order_by(desc(Author.last_seen)) + else: + # По умолчанию сортируем по времени создания + base_query = base_query.order_by(desc(Author.created_at)) + else: + base_query = base_query.order_by(desc(Author.created_at)) + + # Применяем лимит и смещение + base_query = base_query.limit(limit).offset(offset) + + # Получаем авторов + authors = session.execute(base_query).scalars().all() + author_ids = [author.id for author in authors] + + if not author_ids: + return [] + + # Оптимизированный запрос для получения статистики по публикациям для авторов + shouts_stats_query = f""" + SELECT sa.author, COUNT(DISTINCT s.id) as shouts_count + FROM shout_author sa + JOIN shout s ON sa.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL + WHERE sa.author IN ({",".join(map(str, author_ids))}) + GROUP BY sa.author + """ + shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} + + # Запрос на получение статистики по подписчикам для авторов + followers_stats_query = f""" + SELECT author, COUNT(DISTINCT follower) as followers_count + FROM author_follower + WHERE author IN ({",".join(map(str, author_ids))}) + GROUP BY author + """ + followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} + + # Формируем результат с добавлением статистики + result = [] + for author in authors: + author_dict = author.dict() + author_dict["stat"] = { + "shouts": shouts_stats.get(author.id, 0), + "followers": followers_stats.get(author.id, 0), + } + result.append(author_dict) + + # Кешируем каждого автора отдельно для использования в других функциях + await cache_author(author_dict) + + return result + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_authors_with_stats) + + +# Функция для инвалидации кеша авторов +async def invalidate_authors_cache(author_id=None): + """ + Инвалидирует кеши авторов при изменении данных. + + Args: + author_id: Опциональный ID автора для точечной инвалидации. + Если не указан, инвалидируются все кеши авторов. + """ + if author_id: + # Точечная инвалидация конкретного автора + logger.debug(f"Инвалидация кеша для автора #{author_id}") + specific_keys = [ + f"author:id:{author_id}", + f"author:followers:{author_id}", + f"author:follows-authors:{author_id}", + f"author:follows-topics:{author_id}", + f"author:follows-shouts:{author_id}", + ] + + # Получаем user_id автора, если есть + with local_session() as session: + author = session.query(Author).filter(Author.id == author_id).first() + if author and author.user: + specific_keys.append(f"author:user:{author.user.strip()}") + + # Удаляем конкретные ключи + for key in specific_keys: + try: + await redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + except Exception as e: + logger.error(f"Ошибка при удалении ключа {key}: {e}") + + # Также ищем и удаляем ключи коллекций, содержащих данные об этом авторе + collection_keys = await redis.execute("KEYS", "authors:stats:*") + if collection_keys: + await redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей авторов") + else: + # Общая инвалидация всех кешей авторов + logger.debug("Полная инвалидация кеша авторов") + await invalidate_cache_by_prefix("authors") + @mutation.field("update_author") @login_required @@ -51,10 +222,14 @@ async def update_author(_, info, profile): @query.field("get_authors_all") -def get_authors_all(_, _info): - with local_session() as session: - authors = session.query(Author).all() - return authors +async def get_authors_all(_, _info): + """ + Получает список всех авторов без статистики. + + Returns: + list: Список всех авторов + """ + return await get_all_authors() @query.field("get_author") @@ -105,145 +280,105 @@ async def get_author_id(_, _info, user: str): asyncio.create_task(cache_author(author_dict)) return author_with_stat except Exception as exc: - import traceback - - traceback.print_exc() - logger.error(exc) + logger.error(f"Error getting author: {exc}") + return None @query.field("load_authors_by") async def load_authors_by(_, _info, by, limit, offset): - logger.debug(f"loading authors by {by}") - authors_query = select(Author) + """ + Загружает авторов по заданному критерию с пагинацией. - if by.get("slug"): - authors_query = authors_query.filter(Author.slug.ilike(f"%{by['slug']}%")) - elif by.get("name"): - authors_query = authors_query.filter(Author.name.ilike(f"%{by['name']}%")) - elif by.get("topic"): - authors_query = ( - authors_query.join(ShoutAuthor) # Первое соединение ShoutAuthor - .join(ShoutTopic, ShoutAuthor.shout == ShoutTopic.shout) - .join(Topic, ShoutTopic.topic == Topic.id) - .filter(Topic.slug == str(by["topic"])) - ) + Args: + by: Критерий сортировки авторов (new/active) + limit: Максимальное количество возвращаемых авторов + offset: Смещение для пагинации - if by.get("last_seen"): # в unix time - before = int(time.time()) - by["last_seen"] - authors_query = authors_query.filter(Author.last_seen > before) - elif by.get("created_at"): # в unix time - before = int(time.time()) - by["created_at"] - authors_query = authors_query.filter(Author.created_at > before) - - authors_query = authors_query.limit(limit).offset(offset) - - with local_session() as session: - authors_nostat = session.execute(authors_query).all() - authors = [] - for a in authors_nostat: - if isinstance(a, Author): - author_dict = await get_cached_author(a.id, get_with_stat) - if author_dict and isinstance(author_dict.get("shouts"), int): - authors.append(author_dict) - - # order - order = by.get("order") - if order in ["shouts", "followers"]: - authors_query = authors_query.order_by(desc(text(f"{order}_stat"))) - - # group by - authors = get_with_stat(authors_query) - return authors or [] + Returns: + list: Список авторов с учетом критерия + """ + # Используем оптимизированную функцию для получения авторов + return await get_authors_with_stats(limit, offset, by) def get_author_id_from(slug="", user=None, author_id=None): - if not slug and not user and not author_id: - raise ValueError("One of slug, user, or author_id must be provided") - - author_query = select(Author.id) - if user: - author_query = author_query.filter(Author.user == user) - elif slug: - author_query = author_query.filter(Author.slug == slug) - elif author_id: - author_query = author_query.filter(Author.id == author_id) - - with local_session() as session: - author_id_result = session.execute(author_query).first() - author_id = author_id_result[0] if author_id_result else None - - if not author_id: - raise ValueError("Author not found") - + try: + author_id = None + if author_id: + return author_id + with local_session() as session: + author = None + if slug: + author = session.query(Author).filter(Author.slug == slug).first() + if author: + author_id = author.id + return author_id + if user: + author = session.query(Author).filter(Author.user == user).first() + if author: + author_id = author.id + except Exception as exc: + logger.error(exc) return author_id @query.field("get_author_follows") async def get_author_follows(_, _info, slug="", user=None, author_id=0): - try: - author_id = get_author_id_from(slug, user, author_id) + logger.debug(f"getting follows for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return {} - if bool(author_id): - logger.debug(f"getting {author_id} follows authors") - authors = await get_cached_follower_authors(author_id) - topics = await get_cached_follower_topics(author_id) - return { - "topics": topics, - "authors": authors, - "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], - } - except Exception: - import traceback + followed_authors = await get_cached_follower_authors(author_id) + followed_topics = await get_cached_follower_topics(author_id) - traceback.print_exc() - return {"error": "Author not found"} + # TODO: Get followed communities too + return { + "authors": followed_authors, + "topics": followed_topics, + "communities": DEFAULT_COMMUNITIES, + "shouts": [], + } @query.field("get_author_follows_topics") async def get_author_follows_topics(_, _info, slug="", user=None, author_id=None): - try: - follower_id = get_author_id_from(slug, user, author_id) - topics = await get_cached_follower_topics(follower_id) - return topics - except Exception: - import traceback - - traceback.print_exc() + logger.debug(f"getting followed topics for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return [] + followed_topics = await get_cached_follower_topics(author_id) + return followed_topics @query.field("get_author_follows_authors") async def get_author_follows_authors(_, _info, slug="", user=None, author_id=None): - try: - follower_id = get_author_id_from(slug, user, author_id) - return await get_cached_follower_authors(follower_id) - except Exception: - import traceback - - traceback.print_exc() + logger.debug(f"getting followed authors for @{slug}") + author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) + if not author_id: + return [] + followed_authors = await get_cached_follower_authors(author_id) + return followed_authors def create_author(user_id: str, slug: str, name: str = ""): + author = Author() + author.user = user_id # Связь с user_id из системы авторизации + author.slug = slug # Идентификатор из системы авторизации + author.created_at = author.updated_at = int(time.time()) + author.name = name or slug # если не указано + with local_session() as session: - try: - author = None - if user_id: - author = session.query(Author).filter(Author.user == user_id).first() - elif slug: - author = session.query(Author).filter(Author.slug == slug).first() - if not author: - new_author = Author(user=user_id, slug=slug, name=name) - session.add(new_author) - session.commit() - logger.info(f"author created by webhook {new_author.dict()}") - except Exception as exc: - logger.debug(exc) + session.add(author) + session.commit() + return author @query.field("get_author_followers") async def get_author_followers(_, _info, slug: str = "", user: str = "", author_id: int = 0): - logger.debug(f"getting followers for @{slug}") + logger.debug(f"getting followers for author @{slug} or ID:{author_id}") author_id = get_author_id_from(slug=slug, user=user, author_id=author_id) - followers = [] - if author_id: - followers = await get_cached_author_followers(author_id) + if not author_id: + return [] + followers = await get_cached_author_followers(author_id) return followers diff --git a/resolvers/draft.py b/resolvers/draft.py index 4f7e0ead..0e04ffd8 100644 --- a/resolvers/draft.py +++ b/resolvers/draft.py @@ -1,4 +1,5 @@ import time +from operator import or_ from sqlalchemy.sql import and_ @@ -55,7 +56,11 @@ async def load_drafts(_, info): return {"error": "User ID and author ID are required"} with local_session() as session: - drafts = session.query(Draft).filter(Draft.authors.any(Author.id == author_id)).all() + drafts = ( + session.query(Draft) + .filter(or_(Draft.authors.any(Author.id == author_id), Draft.created_by == author_id)) + .all() + ) return {"drafts": drafts} @@ -96,7 +101,7 @@ async def create_draft(_, info, draft_input): # Проверяем обязательные поля if "body" not in draft_input or not draft_input["body"]: draft_input["body"] = "" # Пустая строка вместо NULL - + if "title" not in draft_input or not draft_input["title"]: draft_input["title"] = "" # Пустая строка вместо NULL @@ -120,24 +125,34 @@ async def create_draft(_, info, draft_input): @mutation.field("update_draft") @login_required -async def update_draft(_, info, draft_input): +async def update_draft(_, info, draft_id: int, draft_input): + """Обновляет черновик публикации. + + Args: + draft_id: ID черновика для обновления + draft_input: Данные для обновления черновика + + Returns: + dict: Обновленный черновик или сообщение об ошибке + """ user_id = info.context.get("user_id") author_dict = info.context.get("author", {}) author_id = author_dict.get("id") - draft_id = draft_input.get("id") - if not draft_id: - return {"error": "Draft ID is required"} + if not user_id or not author_id: return {"error": "Author ID are required"} with local_session() as session: draft = session.query(Draft).filter(Draft.id == draft_id).first() - del draft_input["id"] - Draft.update(draft, {**draft_input}) if not draft: return {"error": "Draft not found"} - draft.updated_at = int(time.time()) + Draft.update(draft, draft_input) + # Set updated_at and updated_by from the authenticated user + current_time = int(time.time()) + draft.updated_at = current_time + draft.updated_by = author_id + session.commit() return {"draft": draft} diff --git a/resolvers/editor.py b/resolvers/editor.py index 89c6e4b2..1efc40cc 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -1,6 +1,6 @@ -import json import time +import orjson from sqlalchemy import and_, desc, select from sqlalchemy.orm import joinedload from sqlalchemy.sql.functions import coalesce @@ -106,7 +106,7 @@ async def get_my_shout(_, info, shout_id: int): if hasattr(shout, "media") and shout.media: if isinstance(shout.media, str): try: - shout.media = json.loads(shout.media) + shout.media = orjson.loads(shout.media) except Exception as e: logger.error(f"Error parsing shout media: {e}") shout.media = [] diff --git a/resolvers/notifier.py b/resolvers/notifier.py index 5cd2fbcb..569f0f7a 100644 --- a/resolvers/notifier.py +++ b/resolvers/notifier.py @@ -1,7 +1,7 @@ -import json import time from typing import List, Tuple +import orjson from sqlalchemy import and_, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import aliased @@ -115,7 +115,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o if (groups_amount + offset) >= limit: break - payload = json.loads(str(notification.payload)) + payload = orjson.loads(str(notification.payload)) if str(notification.entity) == NotificationEntity.SHOUT.value: shout = payload @@ -177,7 +177,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o elif str(notification.entity) == "follower": thread_id = "followers" - follower = json.loads(payload) + follower = orjson.loads(payload) group = groups_by_thread.get(thread_id) if group: if str(notification.action) == "follow": @@ -293,11 +293,11 @@ async def notifications_seen_thread(_, info, thread: str, after: int): ) exclude = set() for nr in removed_reaction_notifications: - reaction = json.loads(str(nr.payload)) + reaction = orjson.loads(str(nr.payload)) reaction_id = reaction.get("id") exclude.add(reaction_id) for n in new_reaction_notifications: - reaction = json.loads(str(n.payload)) + reaction = orjson.loads(str(n.payload)) reaction_id = reaction.get("id") if ( reaction_id not in exclude diff --git a/resolvers/reaction.py b/resolvers/reaction.py index 3f448b96..4a3fe56a 100644 --- a/resolvers/reaction.py +++ b/resolvers/reaction.py @@ -67,50 +67,58 @@ def add_reaction_stat_columns(q): return q -def get_reactions_with_stat(q, limit, offset): +def get_reactions_with_stat(q, limit=10, offset=0): """ Execute the reaction query and retrieve reactions with statistics. :param q: Query with reactions and statistics. :param limit: Number of reactions to load. :param offset: Pagination offset. - :return: List of reactions. + :return: List of reactions as dictionaries. + + >>> get_reactions_with_stat(q, 10, 0) # doctest: +SKIP + [{'id': 1, 'body': 'Текст комментария', 'stat': {'rating': 5, 'comments_count': 3}, ...}] """ q = q.limit(limit).offset(offset) reactions = [] with local_session() as session: result_rows = session.execute(q) - for reaction, author, shout, commented_stat, rating_stat in result_rows: + for reaction, author, shout, comments_count, rating_stat in result_rows: # Пропускаем реакции с отсутствующими shout или author if not shout or not author: logger.error(f"Пропущена реакция из-за отсутствия shout или author: {reaction.dict()}") continue - reaction.created_by = author.dict() - reaction.shout = shout.dict() - reaction.stat = {"rating": rating_stat, "comments": commented_stat} - reactions.append(reaction) + # Преобразуем Reaction в словарь для доступа по ключу + reaction_dict = reaction.dict() + reaction_dict["created_by"] = author.dict() + reaction_dict["shout"] = shout.dict() + reaction_dict["stat"] = {"rating": rating_stat, "comments_count": comments_count} + reactions.append(reaction_dict) return reactions def is_featured_author(session, author_id) -> bool: """ - Check if an author has at least one featured article. + Check if an author has at least one non-deleted featured article. :param session: Database session. :param author_id: Author ID. :return: True if the author has a featured article, else False. """ return session.query( - session.query(Shout).where(Shout.authors.any(id=author_id)).filter(Shout.featured_at.is_not(None)).exists() + session.query(Shout) + .where(Shout.authors.any(id=author_id)) + .filter(Shout.featured_at.is_not(None), Shout.deleted_at.is_(None)) + .exists() ).scalar() def check_to_feature(session, approver_id, reaction) -> bool: """ - Make a shout featured if it receives more than 4 votes. + Make a shout featured if it receives more than 4 votes from authors. :param session: Database session. :param approver_id: Approver author ID. @@ -118,46 +126,78 @@ def check_to_feature(session, approver_id, reaction) -> bool: :return: True if shout should be featured, else False. """ if not reaction.reply_to and is_positive(reaction.kind): - approvers = {approver_id} - # Count the number of approvers + # Проверяем, не содержит ли пост более 20% дизлайков + # Если да, то не должен быть featured независимо от количества лайков + if check_to_unfeature(session, reaction): + return False + + # Собираем всех авторов, поставивших лайк + author_approvers = set() reacted_readers = ( session.query(Reaction.created_by) - .filter(Reaction.shout == reaction.shout, is_positive(Reaction.kind), Reaction.deleted_at.is_(None)) + .filter( + Reaction.shout == reaction.shout, + is_positive(Reaction.kind), + # Рейтинги (LIKE, DISLIKE) физически удаляются, поэтому фильтр deleted_at не нужен + ) .distinct() + .all() ) - for reader_id in reacted_readers: + # Добавляем текущего одобряющего + approver = session.query(Author).filter(Author.id == approver_id).first() + if approver and is_featured_author(session, approver_id): + author_approvers.add(approver_id) + + # Проверяем, есть ли у реагировавших авторов featured публикации + for (reader_id,) in reacted_readers: if is_featured_author(session, reader_id): - approvers.add(reader_id) - return len(approvers) > 4 + author_approvers.add(reader_id) + + # Публикация становится featured при наличии более 4 лайков от авторов + logger.debug(f"Публикация {reaction.shout} имеет {len(author_approvers)} лайков от авторов") + return len(author_approvers) > 4 return False -def check_to_unfeature(session, rejecter_id, reaction) -> bool: +def check_to_unfeature(session, reaction) -> bool: """ Unfeature a shout if 20% of reactions are negative. :param session: Database session. - :param rejecter_id: Rejecter author ID. :param reaction: Reaction object. :return: True if shout should be unfeatured, else False. """ - if not reaction.reply_to and is_negative(reaction.kind): + if not reaction.reply_to: + # Проверяем соотношение дизлайков, даже если текущая реакция не дизлайк total_reactions = ( session.query(Reaction) .filter( - Reaction.shout == reaction.shout, Reaction.kind.in_(RATING_REACTIONS), Reaction.deleted_at.is_(None) + Reaction.shout == reaction.shout, + Reaction.reply_to.is_(None), + Reaction.kind.in_(RATING_REACTIONS), + # Рейтинги физически удаляются при удалении, поэтому фильтр deleted_at не нужен ) .count() ) negative_reactions = ( session.query(Reaction) - .filter(Reaction.shout == reaction.shout, is_negative(Reaction.kind), Reaction.deleted_at.is_(None)) + .filter( + Reaction.shout == reaction.shout, + is_negative(Reaction.kind), + Reaction.reply_to.is_(None), + # Рейтинги физически удаляются при удалении, поэтому фильтр deleted_at не нужен + ) .count() ) - return total_reactions > 0 and (negative_reactions / total_reactions) >= 0.2 + # Проверяем, составляют ли отрицательные реакции 20% или более от всех реакций + negative_ratio = negative_reactions / total_reactions if total_reactions > 0 else 0 + logger.debug( + f"Публикация {reaction.shout}: {negative_reactions}/{total_reactions} отрицательных реакций ({negative_ratio:.2%})" + ) + return total_reactions > 0 and negative_ratio >= 0.2 return False @@ -196,8 +236,8 @@ async def _create_reaction(session, shout_id: int, is_author: bool, author_id: i Create a new reaction and perform related actions such as updating counters and notification. :param session: Database session. - :param info: GraphQL context info. - :param shout: Shout object. + :param shout_id: Shout ID. + :param is_author: Flag indicating if the user is the author of the shout. :param author_id: Author ID. :param reaction: Dictionary with reaction data. :return: Dictionary with created reaction data. @@ -217,10 +257,14 @@ async def _create_reaction(session, shout_id: int, is_author: bool, author_id: i # Handle rating if r.kind in RATING_REACTIONS: - if check_to_unfeature(session, author_id, r): + # Проверяем сначала условие для unfeature (дизлайки имеют приоритет) + if check_to_unfeature(session, r): set_unfeatured(session, shout_id) + logger.info(f"Публикация {shout_id} потеряла статус featured из-за высокого процента дизлайков") + # Только если не было unfeature, проверяем условие для feature elif check_to_feature(session, author_id, r): await set_featured(session, shout_id) + logger.info(f"Публикация {shout_id} получила статус featured благодаря лайкам от авторов") # Notify creation await notify_reaction(rdict, "create") @@ -354,7 +398,7 @@ async def update_reaction(_, info, reaction): result = session.execute(reaction_query).unique().first() if result: - r, author, shout, commented_stat, rating_stat = result + r, author, _shout, comments_count, rating_stat = result if not r or not author: return {"error": "Invalid reaction ID or unauthorized"} @@ -369,7 +413,7 @@ async def update_reaction(_, info, reaction): session.commit() r.stat = { - "commented": commented_stat, + "comments_count": comments_count, "rating": rating_stat, } @@ -406,15 +450,24 @@ async def delete_reaction(_, info, reaction_id: int): if r.created_by != author_id and "editor" not in roles: return {"error": "Access denied"} - logger.debug(f"{user_id} user removing his #{reaction_id} reaction") - reaction_dict = r.dict() - session.delete(r) - session.commit() - - # Update author stat if r.kind == ReactionKind.COMMENT.value: + r.deleted_at = int(time.time()) update_author_stat(author.id) + session.add(r) + session.commit() + elif r.kind == ReactionKind.PROPOSE.value: + r.deleted_at = int(time.time()) + session.add(r) + session.commit() + # TODO: add more reaction types here + else: + logger.debug(f"{user_id} user removing his #{reaction_id} reaction") + session.delete(r) + session.commit() + if check_to_unfeature(session, r): + set_unfeatured(session, r.shout) + reaction_dict = r.dict() await notify_reaction(reaction_dict, "delete") return {"error": None, "reaction": reaction_dict} @@ -485,7 +538,9 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): # Add statistics and apply filters q = add_reaction_stat_columns(q) q = apply_reaction_filters(by, q) - q = q.where(Reaction.deleted_at.is_(None)) + + # Include reactions with deleted_at for building comment trees + # q = q.where(Reaction.deleted_at.is_(None)) # Group and sort q = q.group_by(Reaction.id, Author.id, Shout.id) @@ -562,24 +617,22 @@ async def load_shout_comments(_, info, shout: int, limit=50, offset=0): @query.field("load_comment_ratings") async def load_comment_ratings(_, info, comment: int, limit=50, offset=0): """ - Load ratings for a specified comment with pagination and statistics. + Load ratings for a specified comment with pagination. :param info: GraphQL context info. :param comment: Comment ID. :param limit: Number of ratings to load. :param offset: Pagination offset. - :return: List of reactions. + :return: List of ratings. """ q = query_reactions() - q = add_reaction_stat_columns(q) - # Filter, group, sort, limit, offset q = q.filter( and_( Reaction.deleted_at.is_(None), Reaction.reply_to == comment, - Reaction.kind == ReactionKind.COMMENT.value, + Reaction.kind.in_(RATING_REACTIONS), ) ) q = q.group_by(Reaction.id, Author.id, Shout.id) @@ -587,3 +640,187 @@ async def load_comment_ratings(_, info, comment: int, limit=50, offset=0): # Retrieve and return reactions return get_reactions_with_stat(q, limit, offset) + + +@query.field("load_comments_branch") +async def load_comments_branch( + _, + _info, + shout: int, + parent_id: int | None = None, + limit=10, + offset=0, + sort="newest", + children_limit=3, + children_offset=0, +): + """ + Загружает иерархические комментарии с возможностью пагинации корневых и дочерних. + + :param info: GraphQL context info. + :param shout: ID статьи. + :param parent_id: ID родительского комментария (None для корневых). + :param limit: Количество комментариев для загрузки. + :param offset: Смещение для пагинации. + :param sort: Порядок сортировки ('newest', 'oldest', 'like'). + :param children_limit: Максимальное количество дочерних комментариев. + :param children_offset: Смещение для дочерних комментариев. + :return: Список комментариев с дочерними. + """ + # Создаем базовый запрос + q = query_reactions() + q = add_reaction_stat_columns(q) + + # Фильтруем по статье и типу (комментарии) + q = q.filter( + and_( + Reaction.deleted_at.is_(None), + Reaction.shout == shout, + Reaction.kind == ReactionKind.COMMENT.value, + ) + ) + + # Фильтруем по родительскому ID + if parent_id is None: + # Загружаем только корневые комментарии + q = q.filter(Reaction.reply_to.is_(None)) + else: + # Загружаем только прямые ответы на указанный комментарий + q = q.filter(Reaction.reply_to == parent_id) + + # Сортировка и группировка + q = q.group_by(Reaction.id, Author.id, Shout.id) + + # Определяем сортировку + order_by_stmt = None + if sort.lower() == "oldest": + order_by_stmt = asc(Reaction.created_at) + elif sort.lower() == "like": + order_by_stmt = desc("rating_stat") + else: # "newest" по умолчанию + order_by_stmt = desc(Reaction.created_at) + + q = q.order_by(order_by_stmt) + + # Выполняем запрос для получения комментариев + comments = get_reactions_with_stat(q, limit, offset) + + # Если комментарии найдены, загружаем дочерние и количество ответов + if comments: + # Загружаем количество ответов для каждого комментария + await load_replies_count(comments) + + # Загружаем дочерние комментарии + await load_first_replies(comments, children_limit, children_offset, sort) + + return comments + + +async def load_replies_count(comments): + """ + Загружает количество ответов для списка комментариев и обновляет поле stat.comments_count. + + :param comments: Список комментариев, для которых нужно загрузить количество ответов. + """ + if not comments: + return + + comment_ids = [comment["id"] for comment in comments] + + # Запрос для подсчета количества ответов + q = ( + select(Reaction.reply_to.label("parent_id"), func.count().label("count")) + .where( + and_( + Reaction.reply_to.in_(comment_ids), + Reaction.deleted_at.is_(None), + Reaction.kind == ReactionKind.COMMENT.value, + ) + ) + .group_by(Reaction.reply_to) + ) + + # Выполняем запрос + with local_session() as session: + result = session.execute(q).fetchall() + + # Создаем словарь {parent_id: count} + replies_count = {row[0]: row[1] for row in result} + + # Добавляем значения в комментарии + for comment in comments: + if "stat" not in comment: + comment["stat"] = {} + + # Обновляем счетчик комментариев в stat + comment["stat"]["comments_count"] = replies_count.get(comment["id"], 0) + + +async def load_first_replies(comments, limit, offset, sort="newest"): + """ + Загружает первые N ответов для каждого комментария. + + :param comments: Список комментариев, для которых нужно загрузить ответы. + :param limit: Максимальное количество ответов для каждого комментария. + :param offset: Смещение для пагинации дочерних комментариев. + :param sort: Порядок сортировки ответов. + """ + if not comments or limit <= 0: + return + + # Собираем ID комментариев + comment_ids = [comment["id"] for comment in comments] + + # Базовый запрос для загрузки ответов + q = query_reactions() + q = add_reaction_stat_columns(q) + + # Фильтрация: только ответы на указанные комментарии + q = q.filter( + and_( + Reaction.reply_to.in_(comment_ids), + Reaction.deleted_at.is_(None), + Reaction.kind == ReactionKind.COMMENT.value, + ) + ) + + # Группировка + q = q.group_by(Reaction.id, Author.id, Shout.id) + + # Определяем сортировку + order_by_stmt = None + if sort.lower() == "oldest": + order_by_stmt = asc(Reaction.created_at) + elif sort.lower() == "like": + order_by_stmt = desc("rating_stat") + else: # "newest" по умолчанию + order_by_stmt = desc(Reaction.created_at) + + q = q.order_by(order_by_stmt, Reaction.reply_to) + + # Выполняем запрос - указываем limit для неограниченного количества ответов + # но не более 100 на родительский комментарий + replies = get_reactions_with_stat(q, limit=100, offset=0) + + # Группируем ответы по родительским ID + replies_by_parent = {} + for reply in replies: + parent_id = reply.get("reply_to") + if parent_id not in replies_by_parent: + replies_by_parent[parent_id] = [] + replies_by_parent[parent_id].append(reply) + + # Добавляем ответы к соответствующим комментариям с учетом смещения и лимита + for comment in comments: + comment_id = comment["id"] + if comment_id in replies_by_parent: + parent_replies = replies_by_parent[comment_id] + # Применяем смещение и лимит + comment["first_replies"] = parent_replies[offset : offset + limit] + else: + comment["first_replies"] = [] + + # Загружаем количество ответов для дочерних комментариев + all_replies = [reply for replies in replies_by_parent.values() for reply in replies] + if all_replies: + await load_replies_count(all_replies) diff --git a/resolvers/reader.py b/resolvers/reader.py index 91eddad9..f9cea228 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -1,5 +1,4 @@ -import json - +import orjson from graphql import GraphQLResolveInfo from sqlalchemy import and_, nulls_last, text from sqlalchemy.orm import aliased @@ -222,16 +221,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if has_field(info, "stat"): stat = {} if isinstance(row.stat, str): - stat = json.loads(row.stat) + stat = orjson.loads(row.stat) elif isinstance(row.stat, dict): stat = row.stat viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0 - shout_dict["stat"] = {**stat, "viewed": viewed, "commented": stat.get("comments_count", 0)} + shout_dict["stat"] = {**stat, "viewed": viewed} # Обработка main_topic и topics topics = None if has_field(info, "topics") and hasattr(row, "topics"): - topics = json.loads(row.topics) if isinstance(row.topics, str) else row.topics + topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics # logger.debug(f"Shout#{shout_id} topics: {topics}") shout_dict["topics"] = topics @@ -240,7 +239,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if hasattr(row, "main_topic"): # logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}") main_topic = ( - json.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic + orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic ) # logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}") @@ -260,7 +259,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0): if has_field(info, "authors") and hasattr(row, "authors"): shout_dict["authors"] = ( - json.loads(row.authors) if isinstance(row.authors, str) else row.authors + orjson.loads(row.authors) if isinstance(row.authors, str) else row.authors ) if has_field(info, "media") and shout.media: @@ -268,8 +267,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0): media_data = shout.media if isinstance(media_data, str): try: - media_data = json.loads(media_data) - except json.JSONDecodeError: + media_data = orjson.loads(media_data) + except orjson.JSONDecodeError: media_data = [] shout_dict["media"] = [media_data] if isinstance(media_data, dict) else media_data diff --git a/resolvers/topic.py b/resolvers/topic.py index d7460c36..c0f8836e 100644 --- a/resolvers/topic.py +++ b/resolvers/topic.py @@ -1,44 +1,223 @@ -from sqlalchemy import select +from sqlalchemy import desc, select, text from cache.cache import ( + cache_topic, + cached_query, get_cached_topic_authors, get_cached_topic_by_slug, get_cached_topic_followers, + invalidate_cache_by_prefix, ) -from cache.memorycache import cache_region from orm.author import Author from orm.topic import Topic from resolvers.stat import get_with_stat from services.auth import login_required from services.db import local_session +from services.redis import redis from services.schema import mutation, query from utils.logger import root_logger as logger +# Вспомогательная функция для получения всех тем без статистики +async def get_all_topics(): + """ + Получает все темы без статистики. + Используется для случаев, когда нужен полный список тем без дополнительной информации. + + Returns: + list: Список всех тем без статистики + """ + cache_key = "topics:all:basic" + + # Функция для получения всех тем из БД + async def fetch_all_topics(): + logger.debug("Получаем список всех тем из БД и кешируем результат") + + with local_session() as session: + # Запрос на получение базовой информации о темах + topics_query = select(Topic) + topics = session.execute(topics_query).scalars().all() + + # Преобразуем темы в словари + return [topic.dict() for topic in topics] + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_all_topics) + + +# Вспомогательная функция для получения тем со статистикой с пагинацией +async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None): + """ + Получает темы со статистикой с пагинацией. + + Args: + limit: Максимальное количество возвращаемых тем + offset: Смещение для пагинации + community_id: Опциональный ID сообщества для фильтрации + by: Опциональный параметр сортировки + + Returns: + list: Список тем с их статистикой + """ + # Формируем ключ кеша с помощью универсальной функции + cache_key = f"topics:stats:limit={limit}:offset={offset}:community_id={community_id}" + + # Функция для получения тем из БД + async def fetch_topics_with_stats(): + logger.debug(f"Выполняем запрос на получение тем со статистикой: limit={limit}, offset={offset}") + + with local_session() as session: + # Базовый запрос для получения тем + base_query = select(Topic) + + # Добавляем фильтр по сообществу, если указан + if community_id: + base_query = base_query.where(Topic.community == community_id) + + # Применяем сортировку на основе параметра by + if by: + if isinstance(by, dict): + # Обработка словаря параметров сортировки + for field, direction in by.items(): + column = getattr(Topic, field, None) + if column: + if direction.lower() == "desc": + base_query = base_query.order_by(desc(column)) + else: + base_query = base_query.order_by(column) + elif by == "popular": + # Сортировка по популярности (количеству публикаций) + # Примечание: это требует дополнительного запроса или подзапроса + base_query = base_query.order_by( + desc(Topic.id) + ) # Временно, нужно заменить на proper implementation + else: + # По умолчанию сортируем по ID в обратном порядке + base_query = base_query.order_by(desc(Topic.id)) + else: + # По умолчанию сортируем по ID в обратном порядке + base_query = base_query.order_by(desc(Topic.id)) + + # Применяем лимит и смещение + base_query = base_query.limit(limit).offset(offset) + + # Получаем темы + topics = session.execute(base_query).scalars().all() + topic_ids = [topic.id for topic in topics] + + if not topic_ids: + return [] + + # Запрос на получение статистики по публикациям для выбранных тем + shouts_stats_query = f""" + SELECT st.topic, COUNT(DISTINCT s.id) as shouts_count + FROM shout_topic st + JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL + WHERE st.topic IN ({",".join(map(str, topic_ids))}) + GROUP BY st.topic + """ + shouts_stats = {row[0]: row[1] for row in session.execute(text(shouts_stats_query))} + + # Запрос на получение статистики по подписчикам для выбранных тем + followers_stats_query = f""" + SELECT topic, COUNT(DISTINCT follower) as followers_count + FROM topic_followers + WHERE topic IN ({",".join(map(str, topic_ids))}) + GROUP BY topic + """ + followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} + + # Формируем результат с добавлением статистики + result = [] + for topic in topics: + topic_dict = topic.dict() + topic_dict["stat"] = { + "shouts": shouts_stats.get(topic.id, 0), + "followers": followers_stats.get(topic.id, 0), + } + result.append(topic_dict) + + # Кешируем каждую тему отдельно для использования в других функциях + await cache_topic(topic_dict) + + return result + + # Используем универсальную функцию для кеширования запросов + return await cached_query(cache_key, fetch_topics_with_stats) + + +# Функция для инвалидации кеша тем +async def invalidate_topics_cache(topic_id=None): + """ + Инвалидирует кеши тем при изменении данных. + + Args: + topic_id: Опциональный ID темы для точечной инвалидации. + Если не указан, инвалидируются все кеши тем. + """ + if topic_id: + # Точечная инвалидация конкретной темы + logger.debug(f"Инвалидация кеша для темы #{topic_id}") + specific_keys = [ + f"topic:id:{topic_id}", + f"topic:authors:{topic_id}", + f"topic:followers:{topic_id}", + f"topic_shouts_{topic_id}", + ] + + # Получаем slug темы, если есть + with local_session() as session: + topic = session.query(Topic).filter(Topic.id == topic_id).first() + if topic and topic.slug: + specific_keys.append(f"topic:slug:{topic.slug}") + + # Удаляем конкретные ключи + for key in specific_keys: + try: + await redis.execute("DEL", key) + logger.debug(f"Удален ключ кеша {key}") + except Exception as e: + logger.error(f"Ошибка при удалении ключа {key}: {e}") + + # Также ищем и удаляем ключи коллекций, содержащих данные об этой теме + collection_keys = await redis.execute("KEYS", "topics:stats:*") + if collection_keys: + await redis.execute("DEL", *collection_keys) + logger.debug(f"Удалено {len(collection_keys)} коллекционных ключей тем") + else: + # Общая инвалидация всех кешей тем + logger.debug("Полная инвалидация кеша тем") + await invalidate_cache_by_prefix("topics") + + # Запрос на получение всех тем @query.field("get_topics_all") -def get_topics_all(_, _info): - cache_key = "get_topics_all" # Ключ для кеша +async def get_topics_all(_, _info): + """ + Получает список всех тем без статистики. - @cache_region.cache_on_arguments(cache_key) - def _get_topics_all(): - topics_query = select(Topic) - return get_with_stat(topics_query) # Получение тем с учетом статистики - - return _get_topics_all() + Returns: + list: Список всех тем + """ + return await get_all_topics() # Запрос на получение тем по сообществу @query.field("get_topics_by_community") -def get_topics_by_community(_, _info, community_id: int): - cache_key = f"get_topics_by_community_{community_id}" # Ключ для кеша +async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0, by=None): + """ + Получает список тем, принадлежащих указанному сообществу с пагинацией и статистикой. - @cache_region.cache_on_arguments(cache_key) - def _get_topics_by_community(): - topics_by_community_query = select(Topic).where(Topic.community == community_id) - return get_with_stat(topics_by_community_query) + Args: + community_id: ID сообщества + limit: Максимальное количество возвращаемых тем + offset: Смещение для пагинации + by: Опциональные параметры сортировки - return _get_topics_by_community() + Returns: + list: Список тем с их статистикой + """ + return await get_topics_with_stats(limit, offset, community_id, by) # Запрос на получение тем по автору @@ -74,6 +253,9 @@ async def create_topic(_, _info, topic_input): session.add(new_topic) session.commit() + # Инвалидируем кеш всех тем + await invalidate_topics_cache() + return {"topic": new_topic} @@ -87,10 +269,19 @@ async def update_topic(_, _info, topic_input): if not topic: return {"error": "topic not found"} else: + old_slug = topic.slug Topic.update(topic, topic_input) session.add(topic) session.commit() + # Инвалидируем кеш только для этой конкретной темы + await invalidate_topics_cache(topic.id) + + # Если slug изменился, удаляем старый ключ + if old_slug != topic.slug: + await redis.execute("DEL", f"topic:slug:{old_slug}") + logger.debug(f"Удален ключ кеша для старого slug: {old_slug}") + return {"topic": topic} @@ -111,6 +302,11 @@ async def delete_topic(_, info, slug: str): session.delete(t) session.commit() + # Инвалидируем кеш всех тем и конкретной темы + await invalidate_topics_cache() + await redis.execute("DEL", f"topic:slug:{slug}") + await redis.execute("DEL", f"topic:id:{t.id}") + return {} return {"error": "access denied"} diff --git a/schema/query.graphql b/schema/query.graphql index f39fba5c..e07954ae 100644 --- a/schema/query.graphql +++ b/schema/query.graphql @@ -26,6 +26,9 @@ type Query { load_shout_ratings(shout: Int!, limit: Int, offset: Int): [Reaction] load_comment_ratings(comment: Int!, limit: Int, offset: Int): [Reaction] + # branched comments pagination + load_comments_branch(shout: Int!, parent_id: Int, limit: Int, offset: Int, sort: ReactionSort, children_limit: Int, children_offset: Int): [Reaction] + # reader get_shout(slug: String, shout_id: Int): Shout load_shouts_by(options: LoadShoutsOptions): [Shout] @@ -57,7 +60,7 @@ type Query { get_topic(slug: String!): Topic get_topics_all: [Topic] get_topics_by_author(slug: String, user: String, author_id: Int): [Topic] - get_topics_by_community(slug: String, community_id: Int): [Topic] + get_topics_by_community(community_id: Int!, limit: Int, offset: Int): [Topic] # notifier load_notifications(after: Int!, limit: Int, offset: Int): NotificationsResult! diff --git a/schema/type.graphql b/schema/type.graphql index b4d4ba50..17d0c84b 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -55,6 +55,7 @@ type Reaction { stat: Stat oid: String # old_thread: String + first_replies: [Reaction] } type MediaItem { @@ -136,7 +137,7 @@ type Draft { type Stat { rating: Int - commented: Int + comments_count: Int viewed: Int last_commented_at: Int } diff --git a/services/db.py b/services/db.py index 01e3961c..ccca6c8a 100644 --- a/services/db.py +++ b/services/db.py @@ -1,21 +1,23 @@ -import json import math import time import traceback import warnings from typing import Any, Callable, Dict, TypeVar +import orjson import sqlalchemy from sqlalchemy import ( JSON, Column, Engine, + Index, Integer, create_engine, event, exc, func, inspect, + text, ) from sqlalchemy.orm import Session, configure_mappers, declarative_base from sqlalchemy.sql.schema import Table @@ -56,6 +58,82 @@ def create_table_if_not_exists(engine, table): logger.info(f"Table '{table.__tablename__}' ok.") +def sync_indexes(): + """ + Синхронизирует индексы в БД с индексами, определенными в моделях SQLAlchemy. + Создает недостающие индексы, если они определены в моделях, но отсутствуют в БД. + + Использует pg_catalog для PostgreSQL для получения списка существующих индексов. + """ + if not DB_URL.startswith("postgres"): + logger.warning("Функция sync_indexes поддерживается только для PostgreSQL.") + return + + logger.info("Начинаем синхронизацию индексов в базе данных...") + + # Получаем все существующие индексы в БД + with local_session() as session: + existing_indexes_query = text(""" + SELECT + t.relname AS table_name, + i.relname AS index_name + FROM + pg_catalog.pg_class i + JOIN + pg_catalog.pg_index ix ON ix.indexrelid = i.oid + JOIN + pg_catalog.pg_class t ON t.oid = ix.indrelid + JOIN + pg_catalog.pg_namespace n ON n.oid = i.relnamespace + WHERE + i.relkind = 'i' + AND n.nspname = 'public' + AND t.relkind = 'r' + ORDER BY + t.relname, i.relname; + """) + + existing_indexes = {row[1].lower() for row in session.execute(existing_indexes_query)} + logger.debug(f"Найдено {len(existing_indexes)} существующих индексов в БД") + + # Проверяем каждую модель и её индексы + for _model_name, model_class in REGISTRY.items(): + if hasattr(model_class, "__table__") and hasattr(model_class, "__table_args__"): + table_args = model_class.__table_args__ + + # Если table_args - это кортеж, ищем в нём объекты Index + if isinstance(table_args, tuple): + for arg in table_args: + if isinstance(arg, Index): + index_name = arg.name.lower() + + # Проверяем, существует ли индекс в БД + if index_name not in existing_indexes: + logger.info( + f"Создаем отсутствующий индекс {index_name} для таблицы {model_class.__tablename__}" + ) + + # Создаем индекс если он отсутствует + try: + arg.create(engine) + logger.info(f"Индекс {index_name} успешно создан") + except Exception as e: + logger.error(f"Ошибка при создании индекса {index_name}: {e}") + else: + logger.debug(f"Индекс {index_name} уже существует") + + # Анализируем таблицы для оптимизации запросов + for model_name, model_class in REGISTRY.items(): + if hasattr(model_class, "__tablename__"): + try: + session.execute(text(f"ANALYZE {model_class.__tablename__}")) + logger.debug(f"Таблица {model_class.__tablename__} проанализирована") + except Exception as e: + logger.error(f"Ошибка при анализе таблицы {model_class.__tablename__}: {e}") + + logger.info("Синхронизация индексов завершена.") + + # noinspection PyUnusedLocal def local_session(src=""): return Session(bind=engine, expire_on_commit=False) @@ -84,8 +162,8 @@ class Base(declarative_base()): # Check if the value is JSON and decode it if necessary if isinstance(value, (str, bytes)) and isinstance(self.__table__.columns[column_name].type, JSON): try: - data[column_name] = json.loads(value) - except (TypeError, json.JSONDecodeError) as e: + data[column_name] = orjson.loads(value) + except (TypeError, orjson.JSONDecodeError) as e: logger.error(f"Error decoding JSON for column '{column_name}': {e}") data[column_name] = value else: diff --git a/services/notify.py b/services/notify.py index 626afa7b..911bd6ec 100644 --- a/services/notify.py +++ b/services/notify.py @@ -1,4 +1,4 @@ -import json +import orjson from orm.notification import Notification from services.db import local_session @@ -18,7 +18,7 @@ async def notify_reaction(reaction, action: str = "create"): data = {"payload": reaction, "action": action} try: save_notification(action, channel_name, data.get("payload")) - await redis.publish(channel_name, json.dumps(data)) + await redis.publish(channel_name, orjson.dumps(data)) except Exception as e: logger.error(f"Failed to publish to channel {channel_name}: {e}") @@ -28,7 +28,7 @@ async def notify_shout(shout, action: str = "update"): data = {"payload": shout, "action": action} try: save_notification(action, channel_name, data.get("payload")) - await redis.publish(channel_name, json.dumps(data)) + await redis.publish(channel_name, orjson.dumps(data)) except Exception as e: logger.error(f"Failed to publish to channel {channel_name}: {e}") @@ -43,7 +43,7 @@ async def notify_follower(follower: dict, author_id: int, action: str = "follow" save_notification(action, channel_name, data.get("payload")) # Convert data to JSON string - json_data = json.dumps(data) + json_data = orjson.dumps(data) # Ensure the data is not empty before publishing if json_data: diff --git a/services/viewed.py b/services/viewed.py index f1942de0..a388ea0b 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -1,10 +1,11 @@ import asyncio -import json import os import time from datetime import datetime, timedelta, timezone from typing import Dict +import orjson + # ga from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta.types import ( @@ -84,7 +85,7 @@ class ViewedStorage: logger.warn(f" * {viewfile_path} is too old: {self.start_date}") with open(viewfile_path, "r") as file: - precounted_views = json.load(file) + precounted_views = orjson.loads(file.read()) self.precounted_by_slug.update(precounted_views) logger.info(f" * {len(precounted_views)} shouts with views was loaded.") diff --git a/settings.py b/settings.py index 5567e60e..6453b9e3 100644 --- a/settings.py +++ b/settings.py @@ -1,18 +1,24 @@ import sys from os import environ -PORT = 8000 +MODE = "development" if "dev" in sys.argv else "production" +DEV_SERVER_PID_FILE_NAME = "dev-server.pid" + +PORT = environ.get("PORT") or 8000 + +# storages DB_URL = ( environ.get("DATABASE_URL", "").replace("postgres://", "postgresql://") or environ.get("DB_URL", "").replace("postgres://", "postgresql://") or "sqlite:///discoursio.db" ) REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" -AUTH_URL = environ.get("AUTH_URL") or "" -GLITCHTIP_DSN = environ.get("GLITCHTIP_DSN") -DEV_SERVER_PID_FILE_NAME = "dev-server.pid" -MODE = "development" if "dev" in sys.argv else "production" +# debug +GLITCHTIP_DSN = environ.get("GLITCHTIP_DSN") + +# authorizer.dev +AUTH_URL = environ.get("AUTH_URL") or "https://auth.discours.io/graphql" ADMIN_SECRET = environ.get("AUTH_SECRET") or "nothing" WEBHOOK_SECRET = environ.get("WEBHOOK_SECRET") or "nothing-else" diff --git a/utils/encoders.py b/utils/encoders.py index fe4c97d4..e93cc763 100644 --- a/utils/encoders.py +++ b/utils/encoders.py @@ -1,9 +1,28 @@ -import json from decimal import Decimal +from json import JSONEncoder -class CustomJSONEncoder(json.JSONEncoder): +class CustomJSONEncoder(JSONEncoder): + """ + Расширенный JSON энкодер с поддержкой сериализации объектов SQLAlchemy. + + Примеры: + >>> import json + >>> from decimal import Decimal + >>> from orm.topic import Topic + >>> json.dumps(Decimal("10.50"), cls=CustomJSONEncoder) + '"10.50"' + >>> topic = Topic(id=1, slug="test") + >>> json.dumps(topic, cls=CustomJSONEncoder) + '{"id": 1, "slug": "test", ...}' + """ + def default(self, obj): if isinstance(obj, Decimal): return str(obj) + + # Проверяем, есть ли у объекта метод dict() (как у моделей SQLAlchemy) + if hasattr(obj, "dict") and callable(obj.dict): + return obj.dict() + return super().default(obj) diff --git a/utils/logger.py b/utils/logger.py index b49263d4..ef5bce9d 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -13,7 +13,7 @@ def filter(record: logging.LogRecord): record.emoji = ( "🔍" if record.levelno == logging.DEBUG - else "🖊️" + else "ℹ︎" if record.levelno == logging.INFO else "🚧" if record.levelno == logging.WARNING