diff --git a/resolvers/reader.py b/resolvers/reader.py index e9e2d207..de81b81a 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic from services.db import json_array_builder, json_builder, local_session from services.schema import query -from services.search import search_text +from services.search import search_text, search_text_paginated from services.viewed import ViewedStorage from utils.logger import root_logger as logger @@ -399,10 +399,17 @@ async def load_shouts_search(_, info, text, options): :param options: Опции фильтрации и сортировки. :return: Список публикаций, найденных по тексту. """ - limit = options.get("limit", 10) + limit = options.get("limit", 20) offset = options.get("offset", 0) + full_limit = options.get("full_limit", 100) # Maximum results to fetch + if isinstance(text, str) and len(text) > 2: - results = await search_text(text, limit, offset) + # Use the new paginated search function + results, total_results = await search_text_paginated(text, limit, offset, full_limit) + + # Add the total count to the contextual info for the frontend + logger.info(f"Search '{text}' found {total_results} total results, returning {len(results)} from offset {offset}") + scores = {} hits_ids = [] for sr in results: @@ -412,17 +419,29 @@ async def load_shouts_search(_, info, text, options): scores[shout_id] = sr.get("score") hits_ids.append(shout_id) + if not hits_ids: + # Return an empty list with total count info + return {"items": [], "total_count": total_results} + q = query_with_stat(info) - q = q.filter(Shout.id.in_(hits_ids)) q = apply_filters(q, options) shouts = get_shouts_with_links(info, q, limit, offset) + + # Add score to each shout for shout in shouts: - shout["score"] = scores[f"{shout['id']}"] - shouts.sort(key=lambda x: x["score"], reverse=True) - return shouts - return [] + shout_id = f"{shout['id']}" + if shout_id in scores: + shout["score"] = scores[shout_id] + + # Sort by search relevance score + shouts.sort(key=lambda x: x.get("score", 0), reverse=True) + + # Return with total count information + return {"items": shouts, "total_count": total_results} + + return {"items": [], "total_count": 0} @query.field("load_shouts_unrated") diff --git a/services/search.py b/services/search.py index 40211b8b..fd9ac6e0 100644 --- a/services/search.py +++ b/services/search.py @@ -14,12 +14,15 @@ logger.setLevel(logging.INFO) # Change to INFO to see more details SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none") MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25")) +SEARCH_CACHE_SIZE = int(os.environ.get("SEARCH_CACHE_SIZE", "50")) # Number of search results to cache +SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL", "300")) # Seconds to keep search results in cache class SearchService: def __init__(self): logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") self.available = SEARCH_ENABLED + self._search_cache = {} # Cache structure: {query_hash: (timestamp, results)} # Use different timeout settings for indexing and search requests self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) @@ -416,6 +419,77 @@ class SearchService: logger.error(f"Search error for '{text}': {e}", exc_info=True) return [] + async def search_with_cache(self, text, full_limit=100, return_limit=20, offset=0): + """ + Search documents with caching + - Fetches full_limit results from search service + - Caches them with TTL + - Returns only return_limit results starting at offset + + Returns tuple: (results_slice, total_results) + """ + if not self.available: + logger.warning("Search not available") + return [], 0 + + if not isinstance(text, str) or not text.strip(): + logger.warning(f"Invalid search text: {text}") + return [], 0 + + # Generate cache key based on the text + cache_key = text.strip().lower() + current_time = time.time() + + # Check if we have cached results + if cache_key in self._search_cache: + timestamp, cached_results = self._search_cache[cache_key] + # Check if cache is still valid + if current_time - timestamp < SEARCH_CACHE_TTL: + logger.info(f"Using cached results for '{text}', total: {len(cached_results)}") + + # Calculate slice to return + end_offset = offset + return_limit + if end_offset > len(cached_results): + end_offset = len(cached_results) + + if offset >= len(cached_results): + return [], len(cached_results) # Return empty list if offset exceeds results + + return cached_results[offset:end_offset], len(cached_results) + + # No cache hit, perform search + try: + logger.info(f"Fetching {full_limit} results for '{text}'") + full_results = await self.search(text, full_limit, 0) # Get all results from index 0 + + # Cache the results + self._search_cache[cache_key] = (current_time, full_results) + + # Clean up old cache entries if cache is too large + if len(self._search_cache) > SEARCH_CACHE_SIZE: + # Remove oldest entries + oldest_keys = sorted( + self._search_cache.keys(), + key=lambda k: self._search_cache[k][0] + )[:len(self._search_cache) - SEARCH_CACHE_SIZE] + + for k in oldest_keys: + del self._search_cache[k] + + # Calculate slice to return + end_offset = offset + return_limit + if end_offset > len(full_results): + end_offset = len(full_results) + + if offset >= len(full_results): + return [], len(full_results) # Return empty list if offset exceeds results + + return full_results[offset:end_offset], len(full_results) + + except Exception as e: + logger.error(f"Search with cache error for '{text}': {e}", exc_info=True) + return [], 0 + async def check_index_status(self): """Get detailed statistics about the search index health""" if not self.available: @@ -450,6 +524,26 @@ async def search_text(text: str, limit: int = 50, offset: int = 0): payload = await search_service.search(text, limit, offset) return payload +# New function to support pagination from cached results +async def search_text_paginated(text: str, return_limit: int = 20, offset: int = 0, full_limit: int = 100): + """ + Search with pagination support using caching + + Args: + text: The search query + return_limit: How many results to return in current page + offset: Starting offset for pagination + full_limit: Maximum results to fetch and cache + + Returns: + tuple: (results_for_page, total_results_count) + """ + results = [] + total = 0 + if search_service.available: + results, total = await search_service.search_with_cache(text, full_limit, return_limit, offset) + return results, total + async def initialize_search_index(shouts_data): """Initialize search index with existing data during application startup"""