diff --git a/.gitea/workflows/main.yml b/.gitea/workflows/main.yml index 18730b95..f65ae48a 100644 --- a/.gitea/workflows/main.yml +++ b/.gitea/workflows/main.yml @@ -29,7 +29,16 @@ jobs: if: github.ref == 'refs/heads/dev' uses: dokku/github-action@master with: - branch: 'dev' + branch: 'main' force: true git_remote_url: 'ssh://dokku@v2.discours.io:22/core' ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} + + - name: Push to dokku for staging branch + if: github.ref == 'refs/heads/staging' + uses: dokku/github-action@master + with: + branch: 'dev' + git_remote_url: 'ssh://dokku@staging.discours.io:22/core' + ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} + git_push_flags: '--force' \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4db9e7e4..4ba0aec2 100644 --- a/.gitignore +++ b/.gitignore @@ -128,6 +128,9 @@ dmypy.json .idea temp.* +# Debug +DEBUG.log + discours.key discours.crt discours.pem @@ -161,4 +164,5 @@ views.json *.key *.crt *cache.json -.cursor \ No newline at end of file +.cursor +.devcontainer/ diff --git a/main.py b/main.py index ff64c974..536dfdc2 100644 --- a/main.py +++ b/main.py @@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager from services.exception import ExceptionHandlerMiddleware from services.redis import redis from services.schema import create_all_tables, resolvers -from services.search import search_service +#from services.search import search_service +from services.search import search_service, initialize_search_index from services.viewed import ViewedStorage from services.webhook import WebhookEndpoint, create_webhook_endpoint from settings import DEV_SERVER_PID_FILE_NAME, MODE @@ -34,24 +35,67 @@ async def start(): f.write(str(os.getpid())) print(f"[main] process started in {MODE} mode") +async def check_search_service(): + """Check if search service is available and log result""" + info = await search_service.info() + if info.get("status") in ["error", "unavailable"]: + print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}") + else: + print(f"[INFO] Search service is available: {info}") + +# indexing DB data +# async def indexing(): +# from services.db import fetch_all_shouts +# all_shouts = await fetch_all_shouts() +# await initialize_search_index(all_shouts) async def lifespan(_app): try: + print("[lifespan] Starting application initialization") create_all_tables() await asyncio.gather( redis.connect(), precache_data(), ViewedStorage.init(), create_webhook_endpoint(), - search_service.info(), + check_search_service(), start(), revalidation_manager.start(), ) + print("[lifespan] Basic initialization complete") + + # Add a delay before starting the intensive search indexing + print("[lifespan] Waiting for system stabilization before search indexing...") + await asyncio.sleep(10) # 10-second delay to let the system stabilize + + # Start search indexing as a background task with lower priority + asyncio.create_task(initialize_search_index_background()) + yield finally: + print("[lifespan] Shutting down application services") tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()] await asyncio.gather(*tasks, return_exceptions=True) + print("[lifespan] Shutdown complete") +# Initialize search index in the background +async def initialize_search_index_background(): + """Run search indexing as a background task with low priority""" + try: + print("[search] Starting background search indexing process") + from services.db import fetch_all_shouts + + # Get total count first (optional) + all_shouts = await fetch_all_shouts() + total_count = len(all_shouts) if all_shouts else 0 + print(f"[search] Fetched {total_count} shouts for background indexing") + + # Start the indexing process with the fetched shouts + print("[search] Beginning background search index initialization...") + await initialize_search_index(all_shouts) + print("[search] Background search index initialization complete") + except Exception as e: + print(f"[search] Error in background search indexing: {str(e)}") # Создаем экземпляр GraphQL graphql_app = GraphQL(schema, debug=True) diff --git a/orm/shout.py b/orm/shout.py index d74e84d4..30a8dc69 100644 --- a/orm/shout.py +++ b/orm/shout.py @@ -71,6 +71,34 @@ class ShoutAuthor(Base): class Shout(Base): """ Публикация в системе. + + Attributes: + body (str) + slug (str) + cover (str) : "Cover image url" + cover_caption (str) : "Cover image alt caption" + lead (str) + title (str) + subtitle (str) + layout (str) + media (dict) + authors (list[Author]) + topics (list[Topic]) + reactions (list[Reaction]) + lang (str) + version_of (int) + oid (str) + seo (str) : JSON + draft (int) + created_at (int) + updated_at (int) + published_at (int) + featured_at (int) + deleted_at (int) + created_by (int) + updated_by (int) + deleted_by (int) + community (int) """ __tablename__ = "shout" diff --git a/requirements.txt b/requirements.txt index ad4acff6..08f492e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,10 @@ starlette gql ariadne granian + +# NLP and search +httpx + orjson pydantic trafilatura \ No newline at end of file diff --git a/resolvers/reader.py b/resolvers/reader.py index a8d6b026..aeb60e50 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic from services.db import json_array_builder, json_builder, local_session from services.schema import query -from services.search import search_text +from services.search import search_text, get_search_count from services.viewed import ViewedStorage from utils.logger import root_logger as logger @@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0): """ shouts = [] try: - # logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}") q = q.limit(limit).offset(offset) with local_session() as session: shouts_result = session.execute(q).all() - # logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query") if not shouts_result: logger.warning("No shouts found in query result") @@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0): shout = None if hasattr(row, "Shout"): shout = row.Shout - # logger.debug(f"Processing shout#{shout.id} at index {idx}") if shout: shout_id = int(f"{shout.id}") shout_dict = shout.dict() @@ -231,20 +228,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0): topics = None if has_field(info, "topics") and hasattr(row, "topics"): topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics - # logger.debug(f"Shout#{shout_id} topics: {topics}") shout_dict["topics"] = topics if has_field(info, "main_topic"): main_topic = None if hasattr(row, "main_topic"): - # logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}") main_topic = ( orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic ) - # logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}") if not main_topic and topics and len(topics) > 0: - # logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list") main_topic = { "id": topics[0]["id"], "title": topics[0]["title"], @@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0): "is_main": True, } elif not main_topic: - logger.warning(f"No main_topic and no topics found for shout#{shout_id}") main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True} shout_dict["main_topic"] = main_topic - # logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}") if has_field(info, "authors") and hasattr(row, "authors"): shout_dict["authors"] = ( @@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0): logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True) raise finally: - logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links") return shouts @@ -401,8 +391,17 @@ async def load_shouts_search(_, info, text, options): """ limit = options.get("limit", 10) offset = options.get("offset", 0) + if isinstance(text, str) and len(text) > 2: + # Get search results with pagination results = await search_text(text, limit, offset) + + # If no results, return empty list + if not results: + logger.info(f"No search results found for '{text}'") + return [] + + # Extract IDs and scores scores = {} hits_ids = [] for sr in results: @@ -412,22 +411,42 @@ async def load_shouts_search(_, info, text, options): scores[shout_id] = sr.get("score") hits_ids.append(shout_id) - q = ( - query_with_stat(info) - if has_field(info, "stat") - else select(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None))) - ) + # Query DB for only the IDs in the current page + q = query_with_stat(info) q = q.filter(Shout.id.in_(hits_ids)) - q = apply_filters(q, options) - q = apply_sorting(q, options) - shouts = get_shouts_with_links(info, q, limit, offset) + q = apply_filters(q, options.get("filters", {})) + + # + shouts = get_shouts_with_links(info, q, len(hits_ids), 0) + + # Add scores from search results for shout in shouts: - shout.score = scores[f"{shout.id}"] - shouts.sort(key=lambda x: x.score, reverse=True) + shout_id = str(shout['id']) + shout["score"] = scores.get(shout_id, 0) + + # Re-sort by search score to maintain ranking + shouts.sort(key=lambda x: scores.get(str(x['id']), 0), reverse=True) + return shouts return [] +@query.field("get_search_results_count") +async def get_search_results_count(_, info, text): + """ + Returns the total count of search results for a search query. + + :param _: Root query object (unused) + :param info: GraphQL context information + :param text: Search query text + :return: Total count of results + """ + if isinstance(text, str) and len(text) > 2: + count = await get_search_count(text) + return {"count": count} + return {"count": 0} + + @query.field("load_shouts_unrated") async def load_shouts_unrated(_, info, options): """ diff --git a/schema/query.graphql b/schema/query.graphql index e07954ae..96055bc6 100644 --- a/schema/query.graphql +++ b/schema/query.graphql @@ -33,6 +33,7 @@ type Query { get_shout(slug: String, shout_id: Int): Shout load_shouts_by(options: LoadShoutsOptions): [Shout] load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult] + get_search_results_count(text: String!): CountResult! load_shouts_bookmarked(options: LoadShoutsOptions): [Shout] # rating diff --git a/schema/type.graphql b/schema/type.graphql index d902d143..eb8e2770 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -207,6 +207,7 @@ type CommonResult { } type SearchResult { + id: Int! slug: String! title: String! cover: String @@ -274,3 +275,7 @@ type MyRateComment { my_rate: ReactionKind } +type CountResult { + count: Int! +} + diff --git a/server.py b/server.py new file mode 100644 index 00000000..281f50ff --- /dev/null +++ b/server.py @@ -0,0 +1,34 @@ +import sys +from pathlib import Path + +from granian.constants import Interfaces +from granian.log import LogLevels +from granian.server import Server + +from settings import PORT +from utils.logger import root_logger as logger + +if __name__ == "__main__": + logger.info("started") + try: + + granian_instance = Server( + "main:app", + address="0.0.0.0", + port=PORT, + interface=Interfaces.ASGI, + workers=1, + websockets=False, + log_level=LogLevels.debug, + backlog=2048, + ) + + if "dev" in sys.argv: + logger.info("dev mode, building ssl context") + granian_instance.build_ssl_context(cert=Path("localhost.pem"), key=Path("localhost-key.pem"), password=None) + granian_instance.serve() + except Exception as error: + logger.error(error, exc_info=True) + raise + finally: + logger.info("stopped") diff --git a/services/db.py b/services/db.py index e9a6b58c..935c25b4 100644 --- a/services/db.py +++ b/services/db.py @@ -19,7 +19,7 @@ from sqlalchemy import ( inspect, text, ) -from sqlalchemy.orm import Session, configure_mappers, declarative_base +from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload from sqlalchemy.sql.schema import Table from settings import DB_URL @@ -259,3 +259,32 @@ def get_json_builder(): # Используем их в коде json_builder, json_array_builder, json_cast = get_json_builder() + +# Fetch all shouts, with authors preloaded +# This function is used for search indexing + +async def fetch_all_shouts(session=None): + """Fetch all published shouts for search indexing with authors preloaded""" + from orm.shout import Shout + + close_session = False + if session is None: + session = local_session() + close_session = True + + try: + # Fetch only published and non-deleted shouts with authors preloaded + query = session.query(Shout).options( + joinedload(Shout.authors) + ).filter( + Shout.published_at.is_not(None), + Shout.deleted_at.is_(None) + ) + shouts = query.all() + return shouts + except Exception as e: + logger.error(f"Error fetching shouts for search indexing: {e}") + return [] + finally: + if close_session: + session.close() \ No newline at end of file diff --git a/services/search.py b/services/search.py index e9257436..33fe9712 100644 --- a/services/search.py +++ b/services/search.py @@ -2,231 +2,886 @@ import asyncio import json import logging import os +import httpx +import time +from collections import defaultdict +from datetime import datetime, timedelta -import orjson -from opensearchpy import OpenSearch - -from services.redis import redis -from utils.encoders import CustomJSONEncoder - -# Set redis logging level to suppress DEBUG messages +# Set up proper logging logger = logging.getLogger("search") -logger.setLevel(logging.WARNING) +logger.setLevel(logging.INFO) # Change to INFO to see more details -ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "") -ELASTIC_USER = os.environ.get("ELASTIC_USER", "") -ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "") -ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200) -ELASTIC_URL = os.environ.get( - "ELASTIC_URL", - f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}", -) -REDIS_TTL = 86400 # 1 день в секундах +# Configuration for search service +SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"]) +TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none") +MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25")) -index_settings = { - "settings": { - "index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"}, - "analysis": { - "analyzer": { - "ru": { - "tokenizer": "standard", - "filter": ["lowercase", "ru_stop", "ru_stemmer"], - } - }, - "filter": { - "ru_stemmer": {"type": "stemmer", "language": "russian"}, - "ru_stop": {"type": "stop", "stopwords": "_russian_"}, - }, - }, - }, - "mappings": { - "properties": { - "body": {"type": "text", "analyzer": "ru"}, - "title": {"type": "text", "analyzer": "ru"}, - "subtitle": {"type": "text", "analyzer": "ru"}, - "lead": {"type": "text", "analyzer": "ru"}, - "media": {"type": "text", "analyzer": "ru"}, - } - }, -} +# Search cache configuration +SEARCH_CACHE_ENABLED = bool(os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"]) +SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "900")) # Default: 15 minutes +SEARCH_MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "0.1")) +SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200")) +SEARCH_USE_REDIS = bool(os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"]) -expected_mapping = index_settings["mappings"] +search_offset = 0 -# Создание цикла событий -search_loop = asyncio.get_event_loop() +# Import Redis client if Redis caching is enabled +if SEARCH_USE_REDIS: + try: + from services.redis import redis + logger.info("Redis client imported for search caching") + except ImportError: + logger.warning("Redis client import failed, falling back to memory cache") + SEARCH_USE_REDIS = False -# В начале файла добавим флаг -SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", "")) - - -def get_indices_stats(): - indices_stats = search_service.client.cat.indices(format="json") - for index_info in indices_stats: - index_name = index_info["index"] - if not index_name.startswith("."): - index_health = index_info["health"] - index_status = index_info["status"] - pri_shards = index_info["pri"] - rep_shards = index_info["rep"] - docs_count = index_info["docs.count"] - docs_deleted = index_info["docs.deleted"] - store_size = index_info["store.size"] - pri_store_size = index_info["pri.store.size"] - - logger.info(f"Index: {index_name}") - logger.info(f"Health: {index_health}") - logger.info(f"Status: {index_status}") - logger.info(f"Primary Shards: {pri_shards}") - logger.info(f"Replica Shards: {rep_shards}") - logger.info(f"Documents Count: {docs_count}") - logger.info(f"Deleted Documents: {docs_deleted}") - logger.info(f"Store Size: {store_size}") - logger.info(f"Primary Store Size: {pri_store_size}") +class SearchCache: + """Cache for search results to enable efficient pagination""" + + def __init__(self, ttl_seconds=SEARCH_CACHE_TTL_SECONDS, max_items=100): + self.cache = {} # Maps search query to list of results + self.last_accessed = {} # Maps search query to last access timestamp + self.ttl = ttl_seconds + self.max_items = max_items + self._redis_prefix = "search_cache:" + + async def store(self, query, results): + """Store search results for a query""" + normalized_query = self._normalize_query(query) + + if SEARCH_USE_REDIS: + try: + serialized_results = json.dumps(results) + await redis.set( + f"{self._redis_prefix}{normalized_query}", + serialized_results, + ex=self.ttl + ) + logger.info(f"Stored {len(results)} search results for query '{query}' in Redis") + return True + except Exception as e: + logger.error(f"Error storing search results in Redis: {e}") + # Fall back to memory cache if Redis fails + + # First cleanup if needed for memory cache + if len(self.cache) >= self.max_items: + self._cleanup() + + # Store results and update timestamp + self.cache[normalized_query] = results + self.last_accessed[normalized_query] = time.time() + logger.info(f"Cached {len(results)} search results for query '{query}' in memory") + return True + + async def get(self, query, limit=10, offset=0): + """Get paginated results for a query""" + normalized_query = self._normalize_query(query) + all_results = None + # Try to get from Redis first + if SEARCH_USE_REDIS: + try: + cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}") + if cached_data: + all_results = json.loads(cached_data) + logger.info(f"Retrieved search results for '{query}' from Redis") + except Exception as e: + logger.error(f"Error retrieving search results from Redis: {e}") + + # Fall back to memory cache if not in Redis + if all_results is None and normalized_query in self.cache: + all_results = self.cache[normalized_query] + self.last_accessed[normalized_query] = time.time() + logger.info(f"Retrieved search results for '{query}' from memory cache") + + # If not found in any cache + if all_results is None: + logger.info(f"Cache miss for query '{query}'") + return None + + # Return paginated subset + end_idx = min(offset + limit, len(all_results)) + if offset >= len(all_results): + logger.warning(f"Requested offset {offset} exceeds result count {len(all_results)}") + return [] + + logger.info(f"Cache hit for '{query}': serving {offset}:{end_idx} of {len(all_results)} results") + return all_results[offset:end_idx] + + async def has_query(self, query): + """Check if query exists in cache""" + normalized_query = self._normalize_query(query) + + # Check Redis first + if SEARCH_USE_REDIS: + try: + exists = await redis.get(f"{self._redis_prefix}{normalized_query}") + if exists: + return True + except Exception as e: + logger.error(f"Error checking Redis for query existence: {e}") + + # Fall back to memory cache + return normalized_query in self.cache + + async def get_total_count(self, query): + """Get total count of results for a query""" + normalized_query = self._normalize_query(query) + + # Check Redis first + if SEARCH_USE_REDIS: + try: + cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}") + if cached_data: + all_results = json.loads(cached_data) + return len(all_results) + except Exception as e: + logger.error(f"Error getting result count from Redis: {e}") + + # Fall back to memory cache + if normalized_query in self.cache: + return len(self.cache[normalized_query]) + + return 0 + + def _normalize_query(self, query): + """Normalize query string for cache key""" + if not query: + return "" + # Simple normalization - lowercase and strip whitespace + return query.lower().strip() + + def _cleanup(self): + """Remove oldest entries if memory cache is full""" + now = time.time() + # First remove expired entries + expired_keys = [ + key for key, last_access in self.last_accessed.items() + if now - last_access > self.ttl + ] + + for key in expired_keys: + if key in self.cache: + del self.cache[key] + if key in self.last_accessed: + del self.last_accessed[key] + + logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries") + + # If still above max size, remove oldest entries + if len(self.cache) >= self.max_items: + # Sort by last access time + sorted_items = sorted(self.last_accessed.items(), key=lambda x: x[1]) + # Remove oldest 20% + remove_count = max(1, int(len(sorted_items) * 0.2)) + for key, _ in sorted_items[:remove_count]: + if key in self.cache: + del self.cache[key] + if key in self.last_accessed: + del self.last_accessed[key] + logger.info(f"Removed {remove_count} oldest search cache entries") class SearchService: - def __init__(self, index_name="search_index"): - logger.info("Инициализируем поиск...") - self.index_name = index_name - self.client = None - self.lock = asyncio.Lock() - - # Инициализация клиента OpenSearch только если поиск включен - if SEARCH_ENABLED: - try: - self.client = OpenSearch( - hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}], - http_compress=True, - http_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - use_ssl=True, - verify_certs=False, - ssl_assert_hostname=False, - ssl_show_warn=False, - ) - logger.info("Клиент OpenSearch.org подключен") - search_loop.create_task(self.check_index()) - except Exception as exc: - logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}") - self.client = None - else: - logger.info("Поиск отключен (ELASTIC_HOST не установлен)") - + def __init__(self): + logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") + self.available = SEARCH_ENABLED + # Use different timeout settings for indexing and search requests + self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) + self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) + # Initialize search cache + self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None + + if not self.available: + logger.info("Search disabled (SEARCH_ENABLED = False)") + + if SEARCH_CACHE_ENABLED: + cache_location = "Redis" if SEARCH_USE_REDIS else "Memory" + logger.info(f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s") + logger.info(f"Minimum score filter: {SEARCH_MIN_SCORE}, prefetch size: {SEARCH_PREFETCH_SIZE}") + async def info(self): - if not SEARCH_ENABLED: + """Return information about search service""" + if not self.available: return {"status": "disabled"} - try: - return get_indices_stats() + response = await self.client.get("/info") + response.raise_for_status() + result = response.json() + logger.info(f"Search service info: {result}") + return result except Exception as e: logger.error(f"Failed to get search info: {e}") return {"status": "error", "message": str(e)} - - def delete_index(self): - if self.client: - logger.warning(f"[!!!] Удаляем индекс {self.index_name}") - self.client.indices.delete(index=self.index_name, ignore_unavailable=True) - - def create_index(self): - if self.client: - logger.info(f"Создается индекс: {self.index_name}") - self.client.indices.create(index=self.index_name, body=index_settings) - logger.info(f"Индекс {self.index_name} создан") - - async def check_index(self): - if self.client: - logger.info(f"Проверяем индекс {self.index_name}...") - if not self.client.indices.exists(index=self.index_name): - self.create_index() - self.client.indices.put_mapping(index=self.index_name, body=expected_mapping) - else: - logger.info(f"Найден существующий индекс {self.index_name}") - # Проверка и обновление структуры индекса, если необходимо - result = self.client.indices.get_mapping(index=self.index_name) - if isinstance(result, str): - result = orjson.loads(result) - if isinstance(result, dict): - mapping = result.get(self.index_name, {}).get("mappings") - logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}") - expected_keys = expected_mapping["properties"].keys() - if mapping and mapping["properties"].keys() != expected_keys: - logger.info(f"Ожидаемая структура индексации: {expected_mapping}") - logger.warning("[!!!] Требуется переиндексация всех данных") - self.delete_index() - self.client = None - else: - logger.error("клиент не инициализован, невозможно проверить индекс") - - def index(self, shout): - if not SEARCH_ENABLED: - return - - if self.client: - logger.info(f"Индексируем пост {shout.id}") - index_body = { - "body": shout.body, - "title": shout.title, - "subtitle": shout.subtitle, - "lead": shout.lead, - "media": shout.media, - } - asyncio.create_task(self.perform_index(shout, index_body)) - - async def perform_index(self, shout, index_body): - if self.client: - try: - await asyncio.wait_for( - self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0 - ) - except asyncio.TimeoutError: - logger.error(f"Indexing timeout for shout {shout.id}") - except Exception as e: - logger.error(f"Indexing error for shout {shout.id}: {e}") - - async def search(self, text, limit, offset): - if not SEARCH_ENABLED: - return [] - - logger.info(f"Ищем: {text} {offset}+{limit}") - search_body = { - "query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}} - } - - if self.client: - search_response = self.client.search( - index=self.index_name, - body=search_body, - size=limit, - from_=offset, - _source=False, - _source_excludes=["title", "body", "subtitle", "media", "lead", "_index"], + + def is_ready(self): + """Check if service is available""" + return self.available + + + async def verify_docs(self, doc_ids): + """Verify which documents exist in the search index across all content types""" + if not self.available: + return {"status": "disabled"} + + try: + logger.info(f"Verifying {len(doc_ids)} documents in search index") + response = await self.client.post( + "/verify-docs", + json={"doc_ids": doc_ids}, + timeout=60.0 # Longer timeout for potentially large ID lists ) - hits = search_response["hits"]["hits"] - results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits] + response.raise_for_status() + result = response.json() + + # Process the more detailed response format + bodies_missing = set(result.get("bodies", {}).get("missing", [])) + titles_missing = set(result.get("titles", {}).get("missing", [])) + + # Combine missing IDs from both bodies and titles + # A document is considered missing if it's missing from either index + all_missing = list(bodies_missing.union(titles_missing)) + + # Log summary of verification results + bodies_missing_count = len(bodies_missing) + titles_missing_count = len(titles_missing) + total_missing_count = len(all_missing) + + logger.info(f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing") + logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total") + + # Return in a backwards-compatible format plus the detailed breakdown + return { + "missing": all_missing, + "details": { + "bodies_missing": list(bodies_missing), + "titles_missing": list(titles_missing), + "bodies_missing_count": bodies_missing_count, + "titles_missing_count": titles_missing_count + } + } + except Exception as e: + logger.error(f"Document verification error: {e}") + return {"status": "error", "message": str(e)} - # если результаты не пустые - if results: - # Кэширование в Redis с TTL - redis_key = f"search:{text}:{offset}+{limit}" - await redis.execute( - "SETEX", - redis_key, - REDIS_TTL, - json.dumps(results, cls=CustomJSONEncoder), + + def index(self, shout): + """Index a single document""" + if not self.available: + return + logger.info(f"Indexing post {shout.id}") + # Start in background to not block + asyncio.create_task(self.perform_index(shout)) + + async def perform_index(self, shout): + """Index a single document across multiple endpoints""" + if not self.available: + return + + try: + logger.info(f"Indexing document {shout.id} to individual endpoints") + indexing_tasks = [] + + # 1. Index title if available + if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str): + title_doc = { + "id": str(shout.id), + "title": shout.title.strip() + } + indexing_tasks.append( + self.index_client.post("/index-title", json=title_doc) ) - return results - return [] + + # 2. Index body content (subtitle, lead, body) + body_text_parts = [] + for field_name in ['subtitle', 'lead', 'body']: + field_value = getattr(shout, field_name, None) + if field_value and isinstance(field_value, str) and field_value.strip(): + body_text_parts.append(field_value.strip()) + + # Process media content if available + media = getattr(shout, 'media', None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict): + if 'title' in media_json: + body_text_parts.append(media_json['title']) + if 'body' in media_json: + body_text_parts.append(media_json['body']) + except json.JSONDecodeError: + body_text_parts.append(media) + elif isinstance(media, dict): + if 'title' in media: + body_text_parts.append(media['title']) + if 'body' in media: + body_text_parts.append(media['body']) + + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] + + body_doc = { + "id": str(shout.id), + "body": body_text + } + indexing_tasks.append( + self.index_client.post("/index-body", json=body_doc) + ) + + # 3. Index authors + authors = getattr(shout, 'authors', []) + for author in authors: + author_id = str(getattr(author, 'id', 0)) + if not author_id or author_id == '0': + continue + + name = getattr(author, 'name', '') + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, 'bio', '') + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, 'about', '') + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + if name: + author_doc = { + "id": author_id, + "name": name, + "bio": combined_bio + } + indexing_tasks.append( + self.index_client.post("/index-author", json=author_doc) + ) + + # Run all indexing tasks in parallel + if indexing_tasks: + responses = await asyncio.gather(*indexing_tasks, return_exceptions=True) + + # Check for errors in responses + for i, response in enumerate(responses): + if isinstance(response, Exception): + logger.error(f"Error in indexing task {i}: {response}") + elif hasattr(response, 'status_code') and response.status_code >= 400: + logger.error(f"Error response in indexing task {i}: {response.status_code}, {await response.text()}") + + logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints") + else: + logger.warning(f"No content to index for shout {shout.id}") + + except Exception as e: + logger.error(f"Indexing error for shout {shout.id}: {e}") + + async def bulk_index(self, shouts): + """Index multiple documents across three separate endpoints""" + if not self.available or not shouts: + logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}") + return + + start_time = time.time() + logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents") + + # Prepare documents for different endpoints + title_docs = [] + body_docs = [] + author_docs = {} # Use dict to prevent duplicate authors + + total_skipped = 0 + + for shout in shouts: + try: + # 1. Process title documents + if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str): + title_docs.append({ + "id": str(shout.id), + "title": shout.title.strip() + }) + + # 2. Process body documents (subtitle, lead, body) + body_text_parts = [] + for field_name in ['subtitle', 'lead', 'body']: + field_value = getattr(shout, field_name, None) + if field_value and isinstance(field_value, str) and field_value.strip(): + body_text_parts.append(field_value.strip()) + + # Process media content if available + media = getattr(shout, 'media', None) + if media: + if isinstance(media, str): + try: + media_json = json.loads(media) + if isinstance(media_json, dict): + if 'title' in media_json: + body_text_parts.append(media_json['title']) + if 'body' in media_json: + body_text_parts.append(media_json['body']) + except json.JSONDecodeError: + body_text_parts.append(media) + elif isinstance(media, dict): + if 'title' in media: + body_text_parts.append(media['title']) + if 'body' in media: + body_text_parts.append(media['body']) + + # Only add body document if we have body text + if body_text_parts: + body_text = " ".join(body_text_parts) + # Truncate if too long + MAX_TEXT_LENGTH = 4000 + if len(body_text) > MAX_TEXT_LENGTH: + body_text = body_text[:MAX_TEXT_LENGTH] + + body_docs.append({ + "id": str(shout.id), + "body": body_text + }) + + # 3. Process authors if available + authors = getattr(shout, 'authors', []) + for author in authors: + author_id = str(getattr(author, 'id', 0)) + if not author_id or author_id == '0': + continue + + # Skip if we've already processed this author + if author_id in author_docs: + continue + + name = getattr(author, 'name', '') + + # Combine bio and about fields + bio_parts = [] + bio = getattr(author, 'bio', '') + if bio and isinstance(bio, str): + bio_parts.append(bio.strip()) + + about = getattr(author, 'about', '') + if about and isinstance(about, str): + bio_parts.append(about.strip()) + + combined_bio = " ".join(bio_parts) + + # Only add if we have author data + if name: + author_docs[author_id] = { + "id": author_id, + "name": name, + "bio": combined_bio + } + + except Exception as e: + logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}") + total_skipped += 1 + + # Convert author dict to list + author_docs_list = list(author_docs.values()) + + # Process each endpoint in parallel + indexing_tasks = [ + self._index_endpoint(title_docs, "/bulk-index-titles", "title"), + self._index_endpoint(body_docs, "/bulk-index-bodies", "body"), + self._index_endpoint(author_docs_list, "/bulk-index-authors", "author") + ] + + await asyncio.gather(*indexing_tasks) + + elapsed = time.time() - start_time + logger.info( + f"Multi-endpoint indexing completed in {elapsed:.2f}s: " + f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, " + f"{total_skipped} shouts skipped" + ) + + async def _index_endpoint(self, documents, endpoint, doc_type): + """Process and index documents to a specific endpoint""" + if not documents: + logger.info(f"No {doc_type} documents to index") + return + + logger.info(f"Indexing {len(documents)} {doc_type} documents") + + # Categorize documents by size + small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type) + + # Process each category with appropriate batch sizes + batch_sizes = { + "small": min(MAX_BATCH_SIZE, 15), + "medium": min(MAX_BATCH_SIZE, 10), + "large": min(MAX_BATCH_SIZE, 3) + } + + for category, docs in [("small", small_docs), ("medium", medium_docs), ("large", large_docs)]: + if docs: + batch_size = batch_sizes[category] + await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}") + + def _categorize_by_size(self, documents, doc_type): + """Categorize documents by size for optimized batch processing""" + small_docs = [] + medium_docs = [] + large_docs = [] + + for doc in documents: + # Extract relevant text based on document type + if doc_type == "title": + text = doc.get("title", "") + elif doc_type == "body": + text = doc.get("body", "") + else: # author + # For authors, consider both name and bio length + text = doc.get("name", "") + " " + doc.get("bio", "") + + text_len = len(text) + + if text_len > 5000: + large_docs.append(doc) + elif text_len > 2000: + medium_docs.append(doc) + else: + small_docs.append(doc) + + logger.info(f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large") + return small_docs, medium_docs, large_docs + + async def _process_batches(self, documents, batch_size, endpoint, batch_prefix): + """Process document batches with retry logic""" + for i in range(0, len(documents), batch_size): + batch = documents[i:i+batch_size] + batch_id = f"{batch_prefix}-{i//batch_size + 1}" + + retry_count = 0 + max_retries = 3 + success = False + + while not success and retry_count < max_retries: + try: + logger.info(f"Sending batch {batch_id} ({len(batch)} docs) to {endpoint}") + response = await self.index_client.post( + endpoint, + json=batch, + timeout=90.0 + ) + + if response.status_code == 422: + error_detail = response.json() + logger.error(f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}") + break + + response.raise_for_status() + success = True + logger.info(f"Successfully indexed batch {batch_id}") + + except Exception as e: + retry_count += 1 + if retry_count >= max_retries: + if len(batch) > 1: + mid = len(batch) // 2 + logger.warning(f"Splitting batch {batch_id} into smaller batches for retry") + await self._process_batches(batch[:mid], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-A") + await self._process_batches(batch[mid:], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-B") + else: + logger.error(f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}") + break + + wait_time = (2 ** retry_count) + (random.random() * 0.5) + logger.warning(f"Retrying batch {batch_id} in {wait_time:.1f}s... (attempt {retry_count+1}/{max_retries})") + await asyncio.sleep(wait_time) + + def _truncate_error_detail(self, error_detail): + """Truncate error details for logging""" + truncated_detail = error_detail.copy() if isinstance(error_detail, dict) else error_detail + + if isinstance(truncated_detail, dict) and 'detail' in truncated_detail and isinstance(truncated_detail['detail'], list): + for i, item in enumerate(truncated_detail['detail']): + if isinstance(item, dict) and 'input' in item: + if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']): + if 'documents' in item['input'] and isinstance(item['input']['documents'], list): + for j, doc in enumerate(item['input']['documents']): + if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100: + item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]" + + if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100: + item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]" + + return truncated_detail + #******************* + # Specialized search methods for titles, bodies, and authors + + async def search_titles(self, text, limit=10, offset=0): + """Search only in titles using the specialized endpoint""" + if not self.available or not text.strip(): + return [] + + cache_key = f"title:{text}" + + # Try cache first if enabled + if SEARCH_CACHE_ENABLED: + if await self.cache.has_query(cache_key): + return await self.cache.get(cache_key, limit, offset) + + try: + logger.info(f"Searching titles for: '{text}' (limit={limit}, offset={offset})") + response = await self.client.post( + "/search-title", + json={"text": text, "limit": limit + offset} + ) + response.raise_for_status() + + result = response.json() + title_results = result.get("results", []) + + # Apply score filtering if needed + if SEARCH_MIN_SCORE > 0: + title_results = [r for r in title_results if r.get("score", 0) >= SEARCH_MIN_SCORE] + + # Store in cache if enabled + if SEARCH_CACHE_ENABLED: + await self.cache.store(cache_key, title_results) + + # Apply offset/limit (API might not support it directly) + return title_results[offset:offset+limit] + + except Exception as e: + logger.error(f"Error searching titles for '{text}': {e}") + return [] + + async def search_bodies(self, text, limit=10, offset=0): + """Search only in document bodies using the specialized endpoint""" + if not self.available or not text.strip(): + return [] + + cache_key = f"body:{text}" + + # Try cache first if enabled + if SEARCH_CACHE_ENABLED: + if await self.cache.has_query(cache_key): + return await self.cache.get(cache_key, limit, offset) + + try: + logger.info(f"Searching bodies for: '{text}' (limit={limit}, offset={offset})") + response = await self.client.post( + "/search-body", + json={"text": text, "limit": limit + offset} + ) + response.raise_for_status() + + result = response.json() + body_results = result.get("results", []) + + # Apply score filtering if needed + if SEARCH_MIN_SCORE > 0: + body_results = [r for r in body_results if r.get("score", 0) >= SEARCH_MIN_SCORE] + + # Store in cache if enabled + if SEARCH_CACHE_ENABLED: + await self.cache.store(cache_key, body_results) + + # Apply offset/limit + return body_results[offset:offset+limit] + + except Exception as e: + logger.error(f"Error searching bodies for '{text}': {e}") + return [] + + async def search_authors(self, text, limit=10, offset=0): + """Search only for authors using the specialized endpoint""" + if not self.available or not text.strip(): + return [] + + cache_key = f"author:{text}" + + # Try cache first if enabled + if SEARCH_CACHE_ENABLED: + if await self.cache.has_query(cache_key): + return await self.cache.get(cache_key, limit, offset) + + try: + logger.info(f"Searching authors for: '{text}' (limit={limit}, offset={offset})") + response = await self.client.post( + "/search-author", + json={"text": text, "limit": limit + offset} + ) + response.raise_for_status() + + result = response.json() + author_results = result.get("results", []) + + # Apply score filtering if needed + if SEARCH_MIN_SCORE > 0: + author_results = [r for r in author_results if r.get("score", 0) >= SEARCH_MIN_SCORE] + + # Store in cache if enabled + if SEARCH_CACHE_ENABLED: + await self.cache.store(cache_key, author_results) + + # Apply offset/limit + return author_results[offset:offset+limit] + + except Exception as e: + logger.error(f"Error searching authors for '{text}': {e}") + return [] + + async def search(self, text, limit, offset): + """ + Legacy search method that searches only bodies for backward compatibility. + Consider using the specialized search methods instead. + """ + logger.warning("Using deprecated search() method - consider using search_bodies(), search_titles(), or search_authors()") + return await self.search_bodies(text, limit, offset) + + async def check_index_status(self): + """Get detailed statistics about the search index health""" + if not self.available: + return {"status": "disabled"} + + try: + response = await self.client.get("/index-status") + response.raise_for_status() + result = response.json() + + if result.get("consistency", {}).get("status") != "ok": + null_count = result.get("consistency", {}).get("null_embeddings_count", 0) + if null_count > 0: + logger.warning(f"Found {null_count} documents with NULL embeddings") + + return result + except Exception as e: + logger.error(f"Failed to check index status: {e}") + return {"status": "error", "message": str(e)} + +# Create the search service singleton search_service = SearchService() +# API-compatible function to perform a search -async def search_text(text: str, limit: int = 50, offset: int = 0): - payload = [] - if search_service.client: - # Использование метода search_post из OpenSearchService - payload = await search_service.search(text, limit, offset) - return payload +async def search_title_text(text: str, limit: int = 10, offset: int = 0): + """Search titles API helper function""" + if search_service.available: + return await search_service.search_titles(text, limit, offset) + return [] +async def search_body_text(text: str, limit: int = 10, offset: int = 0): + """Search bodies API helper function""" + if search_service.available: + return await search_service.search_bodies(text, limit, offset) + return [] -# Проверить что URL корректный -OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net") +async def search_author_text(text: str, limit: int = 10, offset: int = 0): + """Search authors API helper function""" + if search_service.available: + return await search_service.search_authors(text, limit, offset) + return [] + +async def get_title_search_count(text: str): + """Get count of title search results""" + if not search_service.available: + return 0 + + if SEARCH_CACHE_ENABLED: + cache_key = f"title:{text}" + if await search_service.cache.has_query(cache_key): + return await search_service.cache.get_total_count(cache_key) + + # If not found in cache, fetch from endpoint + return len(await search_title_text(text, SEARCH_PREFETCH_SIZE, 0)) + +async def get_body_search_count(text: str): + """Get count of body search results""" + if not search_service.available: + return 0 + + if SEARCH_CACHE_ENABLED: + cache_key = f"body:{text}" + if await search_service.cache.has_query(cache_key): + return await search_service.cache.get_total_count(cache_key) + + # If not found in cache, fetch from endpoint + return len(await search_body_text(text, SEARCH_PREFETCH_SIZE, 0)) + +async def get_author_search_count(text: str): + """Get count of author search results""" + if not search_service.available: + return 0 + + if SEARCH_CACHE_ENABLED: + cache_key = f"author:{text}" + if await search_service.cache.has_query(cache_key): + return await search_service.cache.get_total_count(cache_key) + + # If not found in cache, fetch from endpoint + return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0)) + +async def initialize_search_index(shouts_data): + """Initialize search index with existing data during application startup""" + if not SEARCH_ENABLED: + return + + if not shouts_data: + return + + info = await search_service.info() + if info.get("status") in ["error", "unavailable", "disabled"]: + return + + index_stats = info.get("index_stats", {}) + indexed_doc_count = index_stats.get("total_count", 0) + + index_status = await search_service.check_index_status() + if index_status.get("status") == "inconsistent": + problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", []) + + if problem_ids: + problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids] + if problem_docs: + await search_service.bulk_index(problem_docs) + + db_ids = [str(shout.id) for shout in shouts_data] + + try: + numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()] + if numeric_ids: + min_id = min(numeric_ids) + max_id = max(numeric_ids) + id_range = max_id - min_id + 1 + except Exception as e: + pass + + if abs(indexed_doc_count - len(shouts_data)) > 10: + doc_ids = [str(shout.id) for shout in shouts_data] + + verification = await search_service.verify_docs(doc_ids) + + if verification.get("status") == "error": + return + + missing_ids = verification.get("missing", []) + if missing_ids: + missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids] + await search_service.bulk_index(missing_docs) + else: + pass + + try: + test_query = "test" + # Use body search since that's most likely to return results + test_results = await search_body_text(test_query, 5) + + if test_results: + categories = set() + for result in test_results: + result_id = result.get("id") + matching_shouts = [s for s in shouts_data if str(s.id) == result_id] + if matching_shouts and hasattr(matching_shouts[0], 'category'): + categories.add(getattr(matching_shouts[0], 'category', 'unknown')) + except Exception as e: + pass