feat(reader,search,graphql): added pagination for test only
All checks were successful
Deploy on push / deploy (push) Successful in 51s

This commit is contained in:
Stepan Vladovskiy 2025-04-01 13:57:26 -03:00
parent ecc443c3ad
commit e6adb143fb
2 changed files with 121 additions and 8 deletions

View File

@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import json_array_builder, json_builder, local_session
from services.schema import query
from services.search import search_text
from services.search import search_text, search_text_paginated
from services.viewed import ViewedStorage
from utils.logger import root_logger as logger
@ -399,10 +399,17 @@ async def load_shouts_search(_, info, text, options):
:param options: Опции фильтрации и сортировки.
:return: Список публикаций, найденных по тексту.
"""
limit = options.get("limit", 10)
limit = options.get("limit", 20)
offset = options.get("offset", 0)
full_limit = options.get("full_limit", 100) # Maximum results to fetch
if isinstance(text, str) and len(text) > 2:
results = await search_text(text, limit, offset)
# Use the new paginated search function
results, total_results = await search_text_paginated(text, limit, offset, full_limit)
# Add the total count to the contextual info for the frontend
logger.info(f"Search '{text}' found {total_results} total results, returning {len(results)} from offset {offset}")
scores = {}
hits_ids = []
for sr in results:
@ -412,17 +419,29 @@ async def load_shouts_search(_, info, text, options):
scores[shout_id] = sr.get("score")
hits_ids.append(shout_id)
if not hits_ids:
# Return an empty list with total count info
return {"items": [], "total_count": total_results}
q = query_with_stat(info)
q = q.filter(Shout.id.in_(hits_ids))
q = apply_filters(q, options)
shouts = get_shouts_with_links(info, q, limit, offset)
# Add score to each shout
for shout in shouts:
shout["score"] = scores[f"{shout['id']}"]
shouts.sort(key=lambda x: x["score"], reverse=True)
return shouts
return []
shout_id = f"{shout['id']}"
if shout_id in scores:
shout["score"] = scores[shout_id]
# Sort by search relevance score
shouts.sort(key=lambda x: x.get("score", 0), reverse=True)
# Return with total count information
return {"items": shouts, "total_count": total_results}
return {"items": [], "total_count": 0}
@query.field("load_shouts_unrated")

View File

@ -14,12 +14,15 @@ logger.setLevel(logging.INFO) # Change to INFO to see more details
SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none")
MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25"))
SEARCH_CACHE_SIZE = int(os.environ.get("SEARCH_CACHE_SIZE", "50")) # Number of search results to cache
SEARCH_CACHE_TTL = int(os.environ.get("SEARCH_CACHE_TTL", "300")) # Seconds to keep search results in cache
class SearchService:
def __init__(self):
logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}")
self.available = SEARCH_ENABLED
self._search_cache = {} # Cache structure: {query_hash: (timestamp, results)}
# Use different timeout settings for indexing and search requests
self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
@ -416,6 +419,77 @@ class SearchService:
logger.error(f"Search error for '{text}': {e}", exc_info=True)
return []
async def search_with_cache(self, text, full_limit=100, return_limit=20, offset=0):
"""
Search documents with caching
- Fetches full_limit results from search service
- Caches them with TTL
- Returns only return_limit results starting at offset
Returns tuple: (results_slice, total_results)
"""
if not self.available:
logger.warning("Search not available")
return [], 0
if not isinstance(text, str) or not text.strip():
logger.warning(f"Invalid search text: {text}")
return [], 0
# Generate cache key based on the text
cache_key = text.strip().lower()
current_time = time.time()
# Check if we have cached results
if cache_key in self._search_cache:
timestamp, cached_results = self._search_cache[cache_key]
# Check if cache is still valid
if current_time - timestamp < SEARCH_CACHE_TTL:
logger.info(f"Using cached results for '{text}', total: {len(cached_results)}")
# Calculate slice to return
end_offset = offset + return_limit
if end_offset > len(cached_results):
end_offset = len(cached_results)
if offset >= len(cached_results):
return [], len(cached_results) # Return empty list if offset exceeds results
return cached_results[offset:end_offset], len(cached_results)
# No cache hit, perform search
try:
logger.info(f"Fetching {full_limit} results for '{text}'")
full_results = await self.search(text, full_limit, 0) # Get all results from index 0
# Cache the results
self._search_cache[cache_key] = (current_time, full_results)
# Clean up old cache entries if cache is too large
if len(self._search_cache) > SEARCH_CACHE_SIZE:
# Remove oldest entries
oldest_keys = sorted(
self._search_cache.keys(),
key=lambda k: self._search_cache[k][0]
)[:len(self._search_cache) - SEARCH_CACHE_SIZE]
for k in oldest_keys:
del self._search_cache[k]
# Calculate slice to return
end_offset = offset + return_limit
if end_offset > len(full_results):
end_offset = len(full_results)
if offset >= len(full_results):
return [], len(full_results) # Return empty list if offset exceeds results
return full_results[offset:end_offset], len(full_results)
except Exception as e:
logger.error(f"Search with cache error for '{text}': {e}", exc_info=True)
return [], 0
async def check_index_status(self):
"""Get detailed statistics about the search index health"""
if not self.available:
@ -450,6 +524,26 @@ async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = await search_service.search(text, limit, offset)
return payload
# New function to support pagination from cached results
async def search_text_paginated(text: str, return_limit: int = 20, offset: int = 0, full_limit: int = 100):
"""
Search with pagination support using caching
Args:
text: The search query
return_limit: How many results to return in current page
offset: Starting offset for pagination
full_limit: Maximum results to fetch and cache
Returns:
tuple: (results_for_page, total_results_count)
"""
results = []
total = 0
if search_service.available:
results, total = await search_service.search_with_cache(text, full_limit, return_limit, offset)
return results, total
async def initialize_search_index(shouts_data):
"""Initialize search index with existing data during application startup"""