From f07fd646d35d3831223cb2158222125b8ffea924 Mon Sep 17 00:00:00 2001 From: Untone Date: Thu, 29 Feb 2024 14:04:24 +0300 Subject: [PATCH] indexing-fix --- services/search.py | 81 ++++++++++++++-------------------------------- 1 file changed, 24 insertions(+), 57 deletions(-) diff --git a/services/search.py b/services/search.py index e785ee16..16e704b8 100644 --- a/services/search.py +++ b/services/search.py @@ -1,7 +1,6 @@ +import asyncio import json import os -from multiprocessing import Manager -import threading from opensearchpy import OpenSearch @@ -18,7 +17,6 @@ ELASTIC_URL = os.environ.get( ) REDIS_TTL = 86400 # 1 day in seconds - index_settings = { 'settings': { 'index': {'number_of_shards': 1, 'auto_expand_replicas': '0-all'}, @@ -48,19 +46,13 @@ index_settings = { expected_mapping = index_settings['mappings'] - class SearchService: def __init__(self, index_name='search_index'): self.index_name = index_name - self.manager = Manager() self.client = None - # Используем менеджер для создания Lock и Value - self.lock = threading.Lock() - self.initialized_flag = self.manager.Value('i', 0) - # Only initialize the instance if it's not already initialized - if not self.initialized_flag.value and ELASTIC_HOST: + if ELASTIC_HOST: try: self.client = OpenSearch( hosts=[{'host': ELASTIC_HOST, 'port': ELASTIC_PORT}], @@ -73,13 +65,7 @@ class SearchService: # ca_certs = ca_certs_path ) logger.info(' Клиент OpenSearch.org подключен') - if self.lock.acquire(blocking=False): - try: - self.check_index() - finally: - self.lock.release() - else: - logger.debug(' проверка пропущена') + asyncio.create_task(self.check_index()) except Exception as exc: logger.error(f' {exc}') self.client = None @@ -90,69 +76,52 @@ class SearchService: else: logger.info(' * Задайте переменные среды для подключения к серверу поиска') - def delete_index(self): + async def delete_index(self): if self.client: logger.debug(f' Удаляем индекс {self.index_name}') - self.client.indices.delete(index=self.index_name, ignore_unavailable=True) + await self.client.indices.delete(index=self.index_name, ignore_unavailable=True) - def create_index(self): + async def create_index(self): if self.client: - if self.lock.acquire(blocking=False): - try: - logger.debug(f'Создается индекс: {self.index_name}') - self.delete_index() - self.check_index() - logger.debug(f'Индексс {self.index_name} создан') - except Exception as e: - logger.debug(f'Ошибка создания индекса: {str(e)}') - finally: - self.lock.release() - else: - logger.error('Не получается создать индекс') + logger.debug(f'Создается индекс: {self.index_name}') + await self.delete_index() + await self.client.indices.create(index=self.index_name, body=index_settings) + logger.debug(f'Индекс {self.index_name} создан') - def put_mapping(self): + async def put_mapping(self): if self.client: logger.debug(f' Разметка индекации {self.index_name}') - self.client.indices.put_mapping( + await self.client.indices.put_mapping( index=self.index_name, body=expected_mapping ) - def check_index(self): + async def check_index(self): if self.client: - if not self.client.indices.exists(index=self.index_name): - self.create_index() - self.put_mapping() + if not await self.client.indices.exists(index=self.index_name): + await self.create_index() + await self.put_mapping() else: # Check if the mapping is correct, and recreate the index if needed - mapping = self.client.indices.get_mapping(index=self.index_name) + mapping = await self.client.indices.get_mapping(index=self.index_name) if mapping != expected_mapping: - self.recreate_index() + await self.recreate_index() - def recreate_index(self): - thread = threading.Thread(target=self._recreate_index) - thread.start() - - def _recreate_index(self): - if self.lock.acquire(blocking=False): - try: - self.delete_index() - self.check_index() - finally: - self.lock.release() - else: - logger.debug(' не удалось проиндексировать') + async def recreate_index(self): + async with asyncio.Lock(): + await self.delete_index() + await self.check_index() def index(self, shout): if self.client: id_ = str(shout.id) logger.debug(f' Индексируем пост {id_}') - self.client.index(index=self.index_name, id=id_, body=shout.dict()) + asyncio.create_task(self.client.index(index=self.index_name, id=id_, body=shout.dict())) async def search(self, text, limit, offset): logger.debug(f' Ищем: {text}') search_body = {'query': {'match': {'_all': text}}} if self.client: - search_response = self.client.search( + search_response = await self.client.search( index=self.index_name, body=search_body, size=limit, from_=offset ) hits = search_response['hits']['hits'] @@ -164,10 +133,8 @@ class SearchService: await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(results)) return [] - search_service = SearchService() - async def search_text(text: str, limit: int = 50, offset: int = 0): payload = [] if search_service.client: