This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from multiprocessing import Lock
|
||||
|
||||
from opensearchpy import OpenSearch
|
||||
|
||||
@@ -24,8 +25,8 @@ ELASTIC_URL = os.environ.get(
|
||||
)
|
||||
REDIS_TTL = 86400 # 1 day in seconds
|
||||
|
||||
|
||||
class SearchService:
|
||||
lock = Lock()
|
||||
def __init__(self, index_name='posts'):
|
||||
logger.info('initialized')
|
||||
self.index_name = index_name
|
||||
@@ -59,7 +60,8 @@ class SearchService:
|
||||
logger.error(f'Error while listing indices: {e}')
|
||||
|
||||
def delete_index(self):
|
||||
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
|
||||
if not self.disabled:
|
||||
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
|
||||
|
||||
def create_index(self):
|
||||
index_settings = {
|
||||
@@ -96,11 +98,13 @@ class SearchService:
|
||||
},
|
||||
}
|
||||
try:
|
||||
self.client.indices.create(index=self.index_name, body=index_settings)
|
||||
self.client.indices.close(index=self.index_name)
|
||||
self.client.indices.open(index=self.index_name)
|
||||
with self.lock:
|
||||
self.client.indices.create(index=self.index_name, body=index_settings)
|
||||
self.client.indices.close(index=self.index_name)
|
||||
self.client.indices.open(index=self.index_name)
|
||||
except Exception as error:
|
||||
logger.warn(error)
|
||||
self.disabled = True
|
||||
|
||||
def put_mapping(self):
|
||||
mapping = {
|
||||
@@ -114,7 +118,7 @@ class SearchService:
|
||||
self.client.indices.put_mapping(index=self.index_name, body=mapping)
|
||||
|
||||
def check_index(self):
|
||||
if not self.client.indices.exists(index=self.index_name):
|
||||
if not self.client.indices.exists(index=self.index_name) and not self.disabled:
|
||||
logger.debug(f'Creating {self.index_name} index')
|
||||
self.create_index()
|
||||
self.put_mapping()
|
||||
@@ -135,13 +139,15 @@ class SearchService:
|
||||
self.recreate_index()
|
||||
|
||||
def recreate_index(self):
|
||||
self.delete_index()
|
||||
self.check_index()
|
||||
with self.lock:
|
||||
self.delete_index()
|
||||
self.check_index()
|
||||
|
||||
def index_post(self, shout):
|
||||
id_ = str(shout.id)
|
||||
logger.debug(f'Indexing post id {id_}')
|
||||
self.client.index(index=self.index_name, id=id_, body=shout)
|
||||
if not not self.disabled:
|
||||
id_ = str(shout.id)
|
||||
logger.debug(f'Indexing post id {id_}')
|
||||
self.client.index(index=self.index_name, id=id_, body=shout)
|
||||
|
||||
def search_post(self, query, limit, offset):
|
||||
logger.debug(f'query: {query}')
|
||||
|
Reference in New Issue
Block a user