core/services/search.py

225 lines
9.0 KiB
Python
Raw Normal View History

2024-02-29 11:04:24 +00:00
import asyncio
2022-11-17 19:53:58 +00:00
import json
2024-06-02 13:36:12 +00:00
import logging
2024-06-02 14:01:22 +00:00
import os
2023-12-17 20:30:20 +00:00
2024-01-29 02:00:54 +00:00
from opensearchpy import OpenSearch
2023-12-17 20:30:20 +00:00
2024-08-07 06:51:09 +00:00
from services.redis import redis
2024-08-09 06:37:06 +00:00
from utils.encoders import CustomJSONEncoder
2022-10-04 00:32:29 +00:00
2024-06-02 13:36:12 +00:00
# Set redis logging level to suppress DEBUG messages
logger = logging.getLogger("search")
logger.setLevel(logging.WARNING)
2024-04-17 15:32:23 +00:00
ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "")
ELASTIC_USER = os.environ.get("ELASTIC_USER", "")
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "")
ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200)
2024-05-18 08:00:46 +00:00
ELASTIC_URL = os.environ.get(
"ELASTIC_URL",
f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}",
)
2024-05-18 08:00:01 +00:00
REDIS_TTL = 86400 # 1 день в секундах
2024-01-29 00:27:30 +00:00
2024-01-29 08:09:10 +00:00
index_settings = {
2024-04-17 15:32:23 +00:00
"settings": {
2024-05-18 09:48:43 +00:00
"index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"},
2024-04-17 15:32:23 +00:00
"analysis": {
"analyzer": {
"ru": {
"tokenizer": "standard",
"filter": ["lowercase", "ru_stop", "ru_stemmer"],
2024-01-29 08:09:10 +00:00
}
},
2024-04-17 15:32:23 +00:00
"filter": {
"ru_stemmer": {"type": "stemmer", "language": "russian"},
"ru_stop": {"type": "stop", "stopwords": "_russian_"},
2024-01-29 08:09:10 +00:00
},
},
},
2024-04-17 15:32:23 +00:00
"mappings": {
"properties": {
"body": {"type": "text", "analyzer": "ru"},
2024-05-18 08:58:47 +00:00
"title": {"type": "text", "analyzer": "ru"},
2024-05-18 09:48:43 +00:00
"subtitle": {"type": "text", "analyzer": "ru"},
"lead": {"type": "text", "analyzer": "ru"},
2024-05-18 10:57:30 +00:00
"media": {"type": "text", "analyzer": "ru"},
2024-01-29 08:09:10 +00:00
}
},
}
2024-04-17 15:32:23 +00:00
expected_mapping = index_settings["mappings"]
2024-01-29 08:09:10 +00:00
2024-05-18 08:00:01 +00:00
# Создание цикла событий
2024-02-29 11:09:50 +00:00
search_loop = asyncio.get_event_loop()
2024-05-18 09:48:43 +00:00
2024-05-18 08:52:17 +00:00
def get_indices_stats():
indices_stats = search_service.client.cat.indices(format="json")
for index_info in indices_stats:
index_name = index_info["index"]
2024-05-18 09:48:43 +00:00
if not index_name.startswith("."):
2024-05-18 08:52:17 +00:00
index_health = index_info["health"]
index_status = index_info["status"]
pri_shards = index_info["pri"]
rep_shards = index_info["rep"]
docs_count = index_info["docs.count"]
docs_deleted = index_info["docs.deleted"]
store_size = index_info["store.size"]
pri_store_size = index_info["pri.store.size"]
logger.info(f"Index: {index_name}")
logger.info(f"Health: {index_health}")
logger.info(f"Status: {index_status}")
logger.info(f"Primary Shards: {pri_shards}")
logger.info(f"Replica Shards: {rep_shards}")
logger.info(f"Documents Count: {docs_count}")
logger.info(f"Deleted Documents: {docs_deleted}")
logger.info(f"Store Size: {store_size}")
logger.info(f"Primary Store Size: {pri_store_size}")
2024-02-29 11:09:50 +00:00
2024-01-29 01:09:54 +00:00
class SearchService:
2024-04-17 15:32:23 +00:00
def __init__(self, index_name="search_index"):
2024-05-18 08:22:13 +00:00
logger.info("Инициализируем поиск...")
2024-01-29 00:27:30 +00:00
self.index_name = index_name
2024-01-29 02:56:28 +00:00
self.client = None
2024-05-18 08:00:01 +00:00
self.lock = asyncio.Lock()
2024-01-29 02:56:28 +00:00
2024-05-18 08:00:01 +00:00
# Инициализация клиента OpenSearch
2024-02-29 11:04:24 +00:00
if ELASTIC_HOST:
2024-01-29 02:56:28 +00:00
try:
self.client = OpenSearch(
2024-04-17 15:32:23 +00:00
hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}],
2024-01-29 02:56:28 +00:00
http_compress=True,
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
# ca_certs = ca_certs_path
)
2024-05-18 08:00:01 +00:00
logger.info("Клиент OpenSearch.org подключен")
2024-02-29 11:09:50 +00:00
2024-05-18 08:00:01 +00:00
# Создание задачи и запуск в цикле событий
2024-02-29 11:09:50 +00:00
search_loop.create_task(self.check_index())
2024-01-29 02:56:28 +00:00
except Exception as exc:
2024-05-18 08:00:01 +00:00
logger.error(f"Ошибка подключения к OpenSearch: {exc}")
2024-01-29 03:18:36 +00:00
self.client = None
2024-05-18 08:00:01 +00:00
else:
2024-10-15 08:12:09 +00:00
logger.warning("env var ELASTIC_HOST is not set")
2024-01-29 02:56:28 +00:00
2024-05-18 08:22:13 +00:00
async def info(self):
2024-11-22 17:23:45 +00:00
try:
return get_indices_stats()
except ConnectionError as e:
logger.error(f"Failed to connect to OpenSearch: {e}")
# Возможно стоит добавить fallback поведение
return
2024-01-29 01:41:46 +00:00
2024-02-29 11:17:10 +00:00
def delete_index(self):
2024-01-29 03:18:36 +00:00
if self.client:
2024-05-18 08:32:30 +00:00
logger.warning(f"[!!!] Удаляем индекс {self.index_name}")
2024-02-29 11:17:10 +00:00
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
2024-01-29 00:27:30 +00:00
2024-02-29 11:17:10 +00:00
def create_index(self):
2024-01-29 08:09:10 +00:00
if self.client:
2024-06-02 13:36:12 +00:00
logger.info(f"Создается индекс: {self.index_name}")
2024-02-29 11:17:10 +00:00
self.client.indices.create(index=self.index_name, body=index_settings)
2024-06-02 13:36:12 +00:00
logger.info(f"Индекс {self.index_name} создан")
2024-01-29 00:27:30 +00:00
2024-02-29 11:04:24 +00:00
async def check_index(self):
2024-01-29 03:03:37 +00:00
if self.client:
2024-06-02 13:36:12 +00:00
logger.info(f"Проверяем индекс {self.index_name}...")
2024-02-29 11:11:48 +00:00
if not self.client.indices.exists(index=self.index_name):
2024-02-29 11:17:10 +00:00
self.create_index()
2024-05-30 04:12:00 +00:00
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
2024-01-29 03:03:37 +00:00
else:
2024-05-18 08:00:01 +00:00
logger.info(f"Найден существующий индекс {self.index_name}")
# Проверка и обновление структуры индекса, если необходимо
2024-02-29 11:56:50 +00:00
result = self.client.indices.get_mapping(index=self.index_name)
if isinstance(result, str):
result = json.loads(result)
if isinstance(result, dict):
2024-05-18 08:00:01 +00:00
mapping = result.get(self.index_name, {}).get("mappings")
2024-06-02 13:36:12 +00:00
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}")
2024-10-14 10:10:20 +00:00
expected_keys = expected_mapping["properties"].keys()
if mapping and mapping["properties"].keys() != expected_keys:
2024-06-02 13:36:12 +00:00
logger.info(f"Ожидаемая структура индексации: {expected_mapping}")
2024-10-14 10:10:20 +00:00
logger.warning("[!!!] Требуется переиндексация всех данных")
2024-05-18 09:11:34 +00:00
self.delete_index()
2024-05-18 08:32:30 +00:00
self.client = None
2024-05-18 08:00:01 +00:00
else:
logger.error("клиент не инициализован, невозможно проверить индекс")
2024-01-29 00:27:30 +00:00
2024-01-29 03:42:02 +00:00
def index(self, shout):
2024-01-29 03:18:36 +00:00
if self.client:
2024-06-02 13:36:12 +00:00
logger.info(f"Индексируем пост {shout.id}")
2024-05-18 08:58:47 +00:00
index_body = {
"body": shout.body,
"title": shout.title,
"subtitle": shout.subtitle,
"lead": shout.lead,
"media": shout.media,
}
asyncio.create_task(self.perform_index(shout, index_body))
2024-05-18 08:00:01 +00:00
else:
logger.error("клиент не инициализован, невозможно проидексировать")
2024-04-08 07:23:54 +00:00
2024-05-18 08:58:47 +00:00
async def perform_index(self, shout, index_body):
2024-04-08 07:23:54 +00:00
if self.client:
2024-11-01 21:26:57 +00:00
try:
await asyncio.wait_for(
2024-11-02 09:09:24 +00:00
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0
2024-11-01 21:26:57 +00:00
)
except asyncio.TimeoutError:
logger.error(f"Indexing timeout for shout {shout.id}")
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
2024-01-29 00:27:30 +00:00
2024-01-29 06:45:00 +00:00
async def search(self, text, limit, offset):
2024-06-02 15:58:24 +00:00
logger.info(f"Ищем: {text} {offset}+{limit}")
2024-06-02 11:10:49 +00:00
search_body = {
2024-06-02 12:56:17 +00:00
"query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}}
2024-06-02 11:10:49 +00:00
}
2024-01-29 03:03:37 +00:00
if self.client:
2024-06-02 14:56:24 +00:00
search_response = self.client.search(
index=self.index_name,
body=search_body,
size=limit,
from_=offset,
2024-06-02 15:52:34 +00:00
_source=False,
2024-06-04 05:10:57 +00:00
_source_excludes=["title", "body", "subtitle", "media", "lead", "_index"],
)
2024-04-17 15:32:23 +00:00
hits = search_response["hits"]["hits"]
2024-06-02 12:32:02 +00:00
results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits]
2024-01-29 06:45:00 +00:00
2024-05-18 08:00:01 +00:00
# если результаты не пустые
if results:
# Кэширование в Redis с TTL
2024-06-02 13:48:11 +00:00
redis_key = f"search:{text}:{offset}+{limit}"
2024-05-18 08:00:01 +00:00
await redis.execute(
"SETEX",
redis_key,
REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder),
)
return results
2024-01-29 03:03:37 +00:00
return []
2024-01-29 00:27:30 +00:00
2024-02-29 11:09:50 +00:00
2024-01-29 03:42:02 +00:00
search_service = SearchService()
2024-01-29 01:41:46 +00:00
2024-02-29 11:09:50 +00:00
2024-01-29 01:41:46 +00:00
async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = []
2024-01-29 06:45:00 +00:00
if search_service.client:
2024-05-18 08:00:01 +00:00
# Использование метода search_post из OpenSearchService
2024-01-29 07:48:36 +00:00
payload = await search_service.search(text, limit, offset)
2024-01-29 01:41:46 +00:00
return payload
2024-11-22 17:23:45 +00:00
# Проверить что URL корректный
OPENSEARCH_URL = os.getenv('OPENSEARCH_URL', 'rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net')