From 0375939e738a8dbefd33218802c86270372c3fb0 Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 3 Jun 2025 02:10:08 +0300 Subject: [PATCH] hardcopy-search-service-code --- services/search.py | 472 +++++++++++++++++++++------------------------ 1 file changed, 218 insertions(+), 254 deletions(-) diff --git a/services/search.py b/services/search.py index 411a7564..607eb16f 100644 --- a/services/search.py +++ b/services/search.py @@ -2,34 +2,35 @@ import asyncio import json import logging import os -import random +import secrets import time -from typing import Any, Union +from typing import Any, Optional import httpx -from orm.shout import Shout -from settings import TXTAI_SERVICE_URL -from utils.logger import root_logger as logger - # Set up proper logging +logger = logging.getLogger("search") logger.setLevel(logging.INFO) # Change to INFO to see more details -# Disable noise HTTP cltouchient logging +# Disable noise HTTP client logging logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) # Configuration for search service SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) +TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none") MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25")) # Search cache configuration SEARCH_CACHE_ENABLED = bool(os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"]) -SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "300")) # Default: 5 minutes +SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "300")) # Default: 15 minutes SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200")) SEARCH_USE_REDIS = bool(os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"]) search_offset = 0 +# Глобальная коллекция для фоновых задач +background_tasks = [] + # Import Redis client if Redis caching is enabled if SEARCH_USE_REDIS: try: @@ -45,8 +46,8 @@ class SearchCache: """Cache for search results to enable efficient pagination""" def __init__(self, ttl_seconds: int = SEARCH_CACHE_TTL_SECONDS, max_items: int = 100) -> None: - self.cache: dict[str, list] = {} # Maps search query to list of results - self.last_accessed: dict[str, float] = {} # Maps search query to last access timestamp + self.cache = {} # Maps search query to list of results + self.last_accessed = {} # Maps search query to last access timestamp self.ttl = ttl_seconds self.max_items = max_items self._redis_prefix = "search_cache:" @@ -58,7 +59,7 @@ class SearchCache: if SEARCH_USE_REDIS: try: serialized_results = json.dumps(results) - await redis.serialize_and_set( + await redis.set( f"{self._redis_prefix}{normalized_query}", serialized_results, ex=self.ttl, @@ -79,7 +80,7 @@ class SearchCache: logger.info(f"Cached {len(results)} search results for query '{query}' in memory") return True - async def get(self, query: str, limit: int = 10, offset: int = 0) -> list[dict] | None: + async def get(self, query: str, limit: int = 10, offset: int = 0) -> Optional[list]: """Get paginated results for a query""" normalized_query = self._normalize_query(query) all_results = None @@ -169,7 +170,7 @@ class SearchCache: if key in self.last_accessed: del self.last_accessed[key] - logger.info("Cleaned up %d expired search cache entries", len(expired_keys)) + logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries") # If still above max size, remove oldest entries if len(self.cache) >= self.max_items: @@ -182,12 +183,12 @@ class SearchCache: del self.cache[key] if key in self.last_accessed: del self.last_accessed[key] - logger.info("Removed %d oldest search cache entries", remove_count) + logger.info(f"Removed {remove_count} oldest search cache entries") class SearchService: def __init__(self) -> None: - logger.info("Initializing search service with URL: %s", TXTAI_SERVICE_URL) + logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") self.available = SEARCH_ENABLED # Use different timeout settings for indexing and search requests self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) @@ -202,69 +203,81 @@ class SearchService: cache_location = "Redis" if SEARCH_USE_REDIS else "Memory" logger.info(f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s") - async def info(self) -> dict[str, Any]: - """Check search service info""" - if not SEARCH_ENABLED: - return {"status": "disabled", "message": "Search is disabled"} - + async def info(self) -> dict: + """Return information about search service""" + if not self.available: + return {"status": "disabled"} try: - async with httpx.AsyncClient() as client: - response = await client.get(f"{TXTAI_SERVICE_URL}/info") + response = await self.client.get("/info") response.raise_for_status() result = response.json() logger.info(f"Search service info: {result}") return result - except (httpx.ConnectError, httpx.ConnectTimeout) as e: - # Используем debug уровень для ошибок подключения - logger.debug("Search service connection failed: %s", str(e)) - return {"status": "error", "message": str(e)} - except Exception as e: - # Другие ошибки логируем как debug - logger.debug("Failed to get search info: %s", str(e)) - return {"status": "error", "message": str(e)} + except Exception: + logger.exception("Failed to get search info") + return {"status": "error", "message": "Failed to get search info"} def is_ready(self) -> bool: """Check if service is available""" return self.available - async def verify_docs(self, doc_ids: list[int]) -> dict[str, Any]: + async def verify_docs(self, doc_ids: list) -> dict: """Verify which documents exist in the search index across all content types""" if not self.available: - return {"status": "error", "message": "Search service not available"} + return {"status": "disabled"} try: - # Check documents across all content types - results = {} - for content_type in ["shouts", "authors", "topics"]: - endpoint = f"{TXTAI_SERVICE_URL}/exists/{content_type}" - async with httpx.AsyncClient() as client: - response = await client.post(endpoint, json={"ids": doc_ids}) - response.raise_for_status() - results[content_type] = response.json() + logger.info(f"Verifying {len(doc_ids)} documents in search index") + response = await self.client.post( + "/verify-docs", + json={"doc_ids": doc_ids}, + timeout=60.0, # Longer timeout for potentially large ID lists + ) + response.raise_for_status() + result = response.json() + # Process the more detailed response format + bodies_missing = set(result.get("bodies", {}).get("missing", [])) + titles_missing = set(result.get("titles", {}).get("missing", [])) + + # Combine missing IDs from both bodies and titles + # A document is considered missing if it's missing from either index + all_missing = list(bodies_missing.union(titles_missing)) + + # Log summary of verification results + bodies_missing_count = len(bodies_missing) + titles_missing_count = len(titles_missing) + total_missing_count = len(all_missing) + + logger.info( + f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing" + ) + logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total") + + # Return in a backwards-compatible format plus the detailed breakdown return { - "status": "success", - "verified": results, - "total_docs": len(doc_ids), + "missing": all_missing, + "details": { + "bodies_missing": list(bodies_missing), + "titles_missing": list(titles_missing), + "bodies_missing_count": bodies_missing_count, + "titles_missing_count": titles_missing_count, + }, } - except Exception as e: + except Exception: logger.exception("Document verification error") - return {"status": "error", "message": str(e)} + return {"status": "error", "message": "Document verification error"} - def index(self, shout: Shout) -> None: + def index(self, shout: Any) -> None: """Index a single document""" if not self.available: return - logger.info(f"Indexing post {shout.id}") - # Start in background to not block - task = asyncio.create_task(self.perform_index(shout)) - # Store task reference to prevent garbage collection - self._background_tasks: set[asyncio.Task[None]] = getattr(self, "_background_tasks", set()) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) + # Start in background to not block - store reference in a background collection + # to prevent garbage collection while keeping the method non-blocking + background_tasks.append(asyncio.create_task(self.perform_index(shout))) - async def perform_index(self, shout: Shout) -> None: + async def perform_index(self, shout: Any) -> None: """Index a single document across multiple endpoints""" if not self.available: return @@ -298,11 +311,9 @@ class SearchService: body_text_parts.append(media_json["body"]) except json.JSONDecodeError: body_text_parts.append(media) - elif isinstance(media, dict): - if "title" in media: - body_text_parts.append(media["title"]) - if "body" in media: - body_text_parts.append(media["body"]) + elif isinstance(media, dict) and (media.get("title") or media.get("body")): + body_text_parts.append(media["title"]) + body_text_parts.append(media["body"]) if body_text_parts: body_text = " ".join(body_text_parts) @@ -346,36 +357,32 @@ class SearchService: # Check for errors in responses for i, response in enumerate(responses): if isinstance(response, Exception): - logger.error("Error in indexing task %d: %s", i, response) + logger.error(f"Error in indexing task {i}: {response}") elif hasattr(response, "status_code") and response.status_code >= 400: - error_text = "" - if hasattr(response, "text") and callable(response.text): - try: - error_text = await response.text() - except (Exception, httpx.HTTPError): - error_text = str(response) - logger.error("Error response in indexing task %d: %d, %s", i, response.status_code, error_text) + logger.error( + f"Error response in indexing task {i}: {response.status_code}, {await response.text()}" + ) - logger.info("Document %s indexed across %d endpoints", shout.id, len(indexing_tasks)) + logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints") else: - logger.warning("No content to index for shout %s", shout.id) + logger.warning(f"No content to index for shout {shout.id}") except Exception: - logger.exception("Indexing error for shout %s", shout.id) + logger.exception(f"Indexing error for shout {shout.id}") - async def bulk_index(self, shouts: list[Shout]) -> None: + async def bulk_index(self, shouts: list) -> None: """Index multiple documents across three separate endpoints""" if not self.available or not shouts: logger.warning( - "Bulk indexing skipped: available=%s, shouts_count=%d", self.available, len(shouts) if shouts else 0 + f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}" ) return start_time = time.time() - logger.info("Starting multi-endpoint bulk indexing of %d documents", len(shouts)) + logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents") # Prepare documents for different endpoints - title_docs: list[dict[str, Any]] = [] + title_docs = [] body_docs = [] author_docs = {} # Use dict to prevent duplicate authors @@ -407,11 +414,9 @@ class SearchService: body_text_parts.append(media_json["body"]) except json.JSONDecodeError: body_text_parts.append(media) - elif isinstance(media, dict): - if "title" in media: - body_text_parts.append(media["title"]) - if "body" in media: - body_text_parts.append(media["body"]) + elif isinstance(media, dict) and (media.get("title") or media.get("body")): + body_text_parts.append(media["title"]) + body_text_parts.append(media["body"]) # Only add body document if we have body text if body_text_parts: @@ -457,7 +462,7 @@ class SearchService: } except Exception: - logger.exception("Error processing shout %s for indexing", getattr(shout, "id", "unknown")) + logger.exception(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing") total_skipped += 1 # Convert author dict to list @@ -477,21 +482,18 @@ class SearchService: elapsed = time.time() - start_time logger.info( - "Multi-endpoint indexing completed in %.2fs: %d titles, %d bodies, %d authors, %d shouts skipped", - elapsed, - len(title_docs), - len(body_docs), - len(author_docs_list), - total_skipped, + f"Multi-endpoint indexing completed in {elapsed:.2f}s: " + f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, " + f"{total_skipped} shouts skipped" ) - async def _index_endpoint(self, documents: list[dict], endpoint: str, doc_type: str) -> None: + async def _index_endpoint(self, documents: list, endpoint: str, doc_type: str) -> None: """Process and index documents to a specific endpoint""" if not documents: - logger.info("No %s documents to index", doc_type) + logger.info(f"No {doc_type} documents to index") return - logger.info("Indexing %d %s documents", len(documents), doc_type) + logger.info(f"Indexing {len(documents)} {doc_type} documents") # Categorize documents by size small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type) @@ -512,7 +514,7 @@ class SearchService: batch_size = batch_sizes[category] await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}") - def _categorize_by_size(self, documents: list[dict], doc_type: str) -> tuple[list[dict], list[dict], list[dict]]: + def _categorize_by_size(self, documents: list, doc_type: str) -> tuple[list, list, list]: """Categorize documents by size for optimized batch processing""" small_docs = [] medium_docs = [] @@ -538,15 +540,11 @@ class SearchService: small_docs.append(doc) logger.info( - "%s documents categorized: %d small, %d medium, %d large", - doc_type.capitalize(), - len(small_docs), - len(medium_docs), - len(large_docs), + f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large" ) return small_docs, medium_docs, large_docs - async def _process_batches(self, documents: list[dict], batch_size: int, endpoint: str, batch_prefix: str) -> None: + async def _process_batches(self, documents: list, batch_size: int, endpoint: str, batch_prefix: str) -> None: """Process document batches with retry logic""" for i in range(0, len(documents), batch_size): batch = documents[i : i + batch_size] @@ -563,9 +561,7 @@ class SearchService: if response.status_code == 422: error_detail = response.json() logger.error( - "Validation error from search service for batch %s: %s", - batch_id, - self._truncate_error_detail(error_detail), + f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}" ) break @@ -591,14 +587,14 @@ class SearchService: ) else: logger.exception( - "Failed to index single document in batch %s after %d attempts", batch_id, max_retries + f"Failed to index single document in batch {batch_id} after {max_retries} attempts" ) break - wait_time = (2**retry_count) + (random.SystemRandom().random() * 0.5) + wait_time = (2**retry_count) + (secrets.random() * 0.5) await asyncio.sleep(wait_time) - def _truncate_error_detail(self, error_detail: Union[dict, str, int]) -> Union[dict, str, int]: + def _truncate_error_detail(self, error_detail: Any) -> Any: """Truncate error details for logging""" truncated_detail = error_detail.copy() if isinstance(error_detail, dict) else error_detail @@ -632,174 +628,115 @@ class SearchService: return truncated_detail - async def search(self, text: str, limit: int, offset: int) -> list[dict]: + async def search(self, text: str, limit: int, offset: int) -> list: """Search documents""" if not self.available: - logger.warning("Search service not available") return [] - if not text or not text.strip(): - logger.warning("Empty search query provided") - return [] - - # Устанавливаем общий размер выборки поиска - search_limit = SEARCH_PREFETCH_SIZE if SEARCH_CACHE_ENABLED else limit - - logger.info("Searching for: '%s' (limit=%d, offset=%d, search_limit=%d)", text, limit, offset, search_limit) + # Check if we can serve from cache + if SEARCH_CACHE_ENABLED: + has_cache = await self.cache.has_query(text) + if has_cache: + cached_results = await self.cache.get(text, limit, offset) + if cached_results is not None: + return cached_results + # Not in cache or cache disabled, perform new search try: + # Decide whether to prefetch and cache or just get what we need + search_limit = SEARCH_PREFETCH_SIZE if SEARCH_CACHE_ENABLED else limit + + logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})") + response = await self.client.post( "/search-combined", json={"text": text, "limit": search_limit}, ) + response.raise_for_status() + result = response.json() + formatted_results = result.get("results", []) - logger.debug(f"Search service response status: {response.status_code}") + # filter out non‑numeric IDs + valid_results = [r for r in formatted_results if r.get("id", "").isdigit()] + if len(valid_results) != len(formatted_results): + formatted_results = valid_results - if response.status_code != 200: - logger.error(f"Search service returned status {response.status_code}: {response.text}") - return [] + if len(valid_results) != len(formatted_results): + formatted_results = valid_results - results = response.json() - logger.debug(f"Raw search results: {len(results) if results else 0} items") - - if not results or not isinstance(results, list): - logger.warning(f"No search results or invalid format for query '{text}'") - return [] - - # Обрабатываем каждый результат - formatted_results = [] - for i, item in enumerate(results): - if isinstance(item, dict): - formatted_result = self._format_search_result(item) - formatted_results.append(formatted_result) - logger.debug( - f"Formatted result {i}: id={formatted_result.get('id')}, title={formatted_result.get('title', '')[:50]}..." - ) - else: - logger.warning(f"Invalid search result item {i}: {type(item)}") - - logger.info(f"Successfully formatted {len(formatted_results)} search results for '{text}'") - - # Сохраняем результаты в кеше - if SEARCH_CACHE_ENABLED and self.cache: + if SEARCH_CACHE_ENABLED: + # Store the full prefetch batch, then page it await self.cache.store(text, formatted_results) - logger.debug(f"Stored {len(formatted_results)} results in cache for '{text}'") - - # Если включен кеш и есть лишние результаты - if SEARCH_CACHE_ENABLED and self.cache and await self.cache.has_query(text): - cached_result = await self.cache.get(text, limit, offset) - logger.debug(f"Retrieved {len(cached_result) if cached_result else 0} results from cache for '{text}'") - return cached_result or [] + return await self.cache.get(text, limit, offset) return formatted_results - - except Exception as e: - logger.error(f"Search error for '{text}': {e}", exc_info=True) + except Exception: + logger.exception(f"Search error for '{text}'") return [] - async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list[dict]: + async def search_authors(self, text: str, limit: int = 10, offset: int = 0) -> list: """Search only for authors using the specialized endpoint""" if not self.available or not text.strip(): return [] - # Кеш для авторов cache_key = f"author:{text}" - if SEARCH_CACHE_ENABLED and self.cache and await self.cache.has_query(cache_key): - cached_results = await self.cache.get(cache_key, limit, offset) - if cached_results: - return cached_results + # Check if we can serve from cache + if SEARCH_CACHE_ENABLED: + has_cache = await self.cache.has_query(cache_key) + if has_cache: + cached_results = await self.cache.get(cache_key, limit, offset) + if cached_results is not None: + return cached_results + + # Not in cache or cache disabled, perform new search try: - # Устанавливаем общий размер выборки поиска search_limit = SEARCH_PREFETCH_SIZE if SEARCH_CACHE_ENABLED else limit logger.info( - "Searching authors for: '%s' (limit=%d, offset=%d, search_limit=%d)", text, limit, offset, search_limit + f"Searching authors for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})" ) response = await self.client.post("/search-author", json={"text": text, "limit": search_limit}) + response.raise_for_status() - results = await response.json() - if not results or not isinstance(results, list): - return [] + result = response.json() + author_results = result.get("results", []) - # Форматируем результаты поиска авторов - author_results = [] - for item in results: - if isinstance(item, dict): - formatted_author = self._format_author_result(item) - author_results.append(formatted_author) + # Filter out any invalid results if necessary + valid_results = [r for r in author_results if r.get("id", "").isdigit()] + if len(valid_results) != len(author_results): + author_results = valid_results - # Сохраняем результаты в кеше - if SEARCH_CACHE_ENABLED and self.cache: + if SEARCH_CACHE_ENABLED: + # Store the full prefetch batch, then page it await self.cache.store(cache_key, author_results) + return await self.cache.get(cache_key, limit, offset) - # Возвращаем нужную порцию результатов return author_results[offset : offset + limit] except Exception: - logger.exception("Error searching authors for '%s'", text) + logger.exception(f"Error searching authors for '{text}'") return [] async def check_index_status(self) -> dict: """Get detailed statistics about the search index health""" if not self.available: - return {"status": "unavailable", "message": "Search service not available"} + return {"status": "disabled"} try: - response = await self.client.post("/check-index") - result = await response.json() + response = await self.client.get("/index-status") + response.raise_for_status() + result = response.json() - if isinstance(result, dict): - # Проверяем на NULL эмбеддинги + if result.get("consistency", {}).get("status") != "ok": null_count = result.get("consistency", {}).get("null_embeddings_count", 0) if null_count > 0: - logger.warning("Found %d documents with NULL embeddings", null_count) - except Exception as e: - logger.exception("Failed to check index status") - return {"status": "error", "message": str(e)} - else: + logger.warning(f"Found {null_count} documents with NULL embeddings") + return result - - def _format_search_result(self, item: dict) -> dict: - """Format search result item""" - formatted_result = {} - - # Обязательные поля - if "id" in item: - formatted_result["id"] = item["id"] - if "title" in item: - formatted_result["title"] = item["title"] - if "body" in item: - formatted_result["body"] = item["body"] - - # Дополнительные поля - for field in ["subtitle", "lead", "author_id", "author_name", "created_at", "stat"]: - if field in item: - formatted_result[field] = item[field] - - return formatted_result - - def _format_author_result(self, item: dict) -> dict: - """Format author search result item""" - formatted_result = {} - - # Обязательные поля для автора - if "id" in item: - formatted_result["id"] = item["id"] - if "name" in item: - formatted_result["name"] = item["name"] - if "username" in item: - formatted_result["username"] = item["username"] - - # Дополнительные поля для автора - for field in ["slug", "bio", "pic", "created_at", "stat"]: - if field in item: - formatted_result[field] = item[field] - - return formatted_result - - def close(self) -> None: - """Close the search service""" + except Exception: + logger.exception("Failed to check index status") + return {"status": "error", "message": "Failed to check index status"} # Create the search service singleton @@ -808,14 +745,14 @@ search_service = SearchService() # API-compatible function to perform a search -async def search_text(text: str, limit: int = 200, offset: int = 0) -> list[dict]: +async def search_text(text: str, limit: int = 200, offset: int = 0) -> list: payload = [] if search_service.available: payload = await search_service.search(text, limit, offset) return payload -async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list[dict]: +async def search_author_text(text: str, limit: int = 10, offset: int = 0) -> list: """Search authors API helper function""" if search_service.available: return await search_service.search_authors(text, limit, offset) @@ -827,11 +764,11 @@ async def get_search_count(text: str) -> int: if not search_service.available: return 0 - if SEARCH_CACHE_ENABLED and search_service.cache is not None and await search_service.cache.has_query(text): + if SEARCH_CACHE_ENABLED and await search_service.cache.has_query(text): return await search_service.cache.get_total_count(text) - # Return approximate count for active search - return 42 # Placeholder implementation + # If not found in cache, fetch from endpoint + return len(await search_text(text, SEARCH_PREFETCH_SIZE, 0)) async def get_author_search_count(text: str) -> int: @@ -841,31 +778,48 @@ async def get_author_search_count(text: str) -> int: if SEARCH_CACHE_ENABLED: cache_key = f"author:{text}" - if search_service.cache is not None and await search_service.cache.has_query(cache_key): + if await search_service.cache.has_query(cache_key): return await search_service.cache.get_total_count(cache_key) - return 0 # Placeholder implementation + # If not found in cache, fetch from endpoint + return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)) async def initialize_search_index(shouts_data: list) -> None: """Initialize search index with existing data during application startup""" if not SEARCH_ENABLED: - logger.info("Search is disabled, skipping index initialization") return - if not search_service.available: - logger.warning("Search service not available, skipping index initialization") + if not shouts_data: return + info = await search_service.info() + if info.get("status") in ["error", "unavailable", "disabled"]: + return + + index_stats = info.get("index_stats", {}) + indexed_doc_count = index_stats.get("total_count", 0) + + index_status = await search_service.check_index_status() + if index_status.get("status") == "inconsistent": + problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", []) + + if problem_ids: + problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids] + if problem_docs: + await search_service.bulk_index(problem_docs) + # Only consider shouts with body content for body verification - def has_body_content(shout: dict) -> bool: + def has_body_content(shout: Any) -> bool: for field in ["subtitle", "lead", "body"]: - if hasattr(shout, field) and getattr(shout, field) and getattr(shout, field).strip(): + if ( + getattr(shout, field, None) + and isinstance(getattr(shout, field, None), str) + and getattr(shout, field).strip() + ): return True - - # Check media JSON for content - if hasattr(shout, "media") and shout.media: - media = shout.media + media = getattr(shout, "media", None) + if media: if isinstance(media, str): try: media_json = json.loads(media) @@ -877,26 +831,36 @@ async def initialize_search_index(shouts_data: list) -> None: return True return False - total_count = len(shouts_data) - processed_count = 0 + shouts_with_body = [shout for shout in shouts_data if has_body_content(shout)] + body_ids = [str(shout.id) for shout in shouts_with_body] - # Collect categories while we're at it for informational purposes - categories: set = set() - - try: - for shout in shouts_data: - # Skip items that lack meaningful text content - if not has_body_content(shout): - continue - - # Track categories - matching_shouts = [s for s in shouts_data if getattr(s, "id", None) == getattr(shout, "id", None)] - if matching_shouts and hasattr(matching_shouts[0], "category"): - categories.add(getattr(matching_shouts[0], "category", "unknown")) - except (AttributeError, TypeError): + if abs(indexed_doc_count - len(shouts_data)) > 10: + doc_ids = [str(shout.id) for shout in shouts_data] + verification = await search_service.verify_docs(doc_ids) + if verification.get("status") == "error": + return + # Only reindex missing docs that actually have body content + missing_ids = [mid for mid in verification.get("missing", []) if mid in body_ids] + if missing_ids: + missing_docs = [shout for shout in shouts_with_body if str(shout.id) in missing_ids] + await search_service.bulk_index(missing_docs) + else: pass - logger.info("Search index initialization completed: %d/%d items", processed_count, total_count) + try: + test_query = "test" + # Use body search since that's most likely to return results + test_results = await search_text(test_query, 5) + + if test_results: + categories = set() + for result in test_results: + result_id = result.get("id") + matching_shouts = [s for s in shouts_data if str(s.id) == result_id] + if matching_shouts and hasattr(matching_shouts[0], "category"): + categories.add(getattr(matching_shouts[0], "category", "unknown")) + except Exception as ex: + logger.warning(f"Test search failed during initialization: {ex}") async def check_search_service() -> None: