This commit is contained in:
parent
56bf5b2874
commit
5002e85177
|
@ -11,7 +11,7 @@ from services.auth import login_required
|
||||||
from services.db import local_session
|
from services.db import local_session
|
||||||
from services.notify import notify_shout
|
from services.notify import notify_shout
|
||||||
from services.schema import mutation, query
|
from services.schema import mutation, query
|
||||||
from services.search import search
|
from services.search import search_service
|
||||||
|
|
||||||
|
|
||||||
@query.field('get_shouts_drafts')
|
@query.field('get_shouts_drafts')
|
||||||
|
@ -73,7 +73,11 @@ async def create_shout(_, info, inp):
|
||||||
sa = ShoutAuthor(shout=shout.id, author=author.id)
|
sa = ShoutAuthor(shout=shout.id, author=author.id)
|
||||||
session.add(sa)
|
session.add(sa)
|
||||||
|
|
||||||
topics = session.query(Topic).filter(Topic.slug.in_(inp.get('topics', []))).all()
|
topics = (
|
||||||
|
session.query(Topic)
|
||||||
|
.filter(Topic.slug.in_(inp.get('topics', [])))
|
||||||
|
.all()
|
||||||
|
)
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
t = ShoutTopic(topic=topic.id, shout=shout.id)
|
t = ShoutTopic(topic=topic.id, shout=shout.id)
|
||||||
session.add(t)
|
session.add(t)
|
||||||
|
@ -114,7 +118,9 @@ async def update_shout( # noqa: C901
|
||||||
topics_input = shout_input['topics']
|
topics_input = shout_input['topics']
|
||||||
del shout_input['topics']
|
del shout_input['topics']
|
||||||
new_topics_to_link = []
|
new_topics_to_link = []
|
||||||
new_topics = [topic_input for topic_input in topics_input if topic_input['id'] < 0]
|
new_topics = [
|
||||||
|
topic_input for topic_input in topics_input if topic_input['id'] < 0
|
||||||
|
]
|
||||||
for new_topic in new_topics:
|
for new_topic in new_topics:
|
||||||
del new_topic['id']
|
del new_topic['id']
|
||||||
created_new_topic = Topic(**new_topic)
|
created_new_topic = Topic(**new_topic)
|
||||||
|
@ -123,21 +129,31 @@ async def update_shout( # noqa: C901
|
||||||
if len(new_topics) > 0:
|
if len(new_topics) > 0:
|
||||||
session.commit()
|
session.commit()
|
||||||
for new_topic_to_link in new_topics_to_link:
|
for new_topic_to_link in new_topics_to_link:
|
||||||
created_unlinked_topic = ShoutTopic(shout=shout.id, topic=new_topic_to_link.id)
|
created_unlinked_topic = ShoutTopic(
|
||||||
|
shout=shout.id, topic=new_topic_to_link.id
|
||||||
|
)
|
||||||
session.add(created_unlinked_topic)
|
session.add(created_unlinked_topic)
|
||||||
existing_topics_input = [topic_input for topic_input in topics_input if topic_input.get('id', 0) > 0]
|
existing_topics_input = [
|
||||||
|
topic_input
|
||||||
|
for topic_input in topics_input
|
||||||
|
if topic_input.get('id', 0) > 0
|
||||||
|
]
|
||||||
existing_topic_to_link_ids = [
|
existing_topic_to_link_ids = [
|
||||||
existing_topic_input['id']
|
existing_topic_input['id']
|
||||||
for existing_topic_input in existing_topics_input
|
for existing_topic_input in existing_topics_input
|
||||||
if existing_topic_input['id'] not in [topic.id for topic in shout.topics]
|
if existing_topic_input['id']
|
||||||
|
not in [topic.id for topic in shout.topics]
|
||||||
]
|
]
|
||||||
for existing_topic_to_link_id in existing_topic_to_link_ids:
|
for existing_topic_to_link_id in existing_topic_to_link_ids:
|
||||||
created_unlinked_topic = ShoutTopic(shout=shout.id, topic=existing_topic_to_link_id)
|
created_unlinked_topic = ShoutTopic(
|
||||||
|
shout=shout.id, topic=existing_topic_to_link_id
|
||||||
|
)
|
||||||
session.add(created_unlinked_topic)
|
session.add(created_unlinked_topic)
|
||||||
topic_to_unlink_ids = [
|
topic_to_unlink_ids = [
|
||||||
topic.id
|
topic.id
|
||||||
for topic in shout.topics
|
for topic in shout.topics
|
||||||
if topic.id not in [topic_input['id'] for topic_input in existing_topics_input]
|
if topic.id
|
||||||
|
not in [topic_input['id'] for topic_input in existing_topics_input]
|
||||||
]
|
]
|
||||||
shout_topics_to_remove = session.query(ShoutTopic).filter(
|
shout_topics_to_remove = session.query(ShoutTopic).filter(
|
||||||
and_(
|
and_(
|
||||||
|
@ -149,7 +165,9 @@ async def update_shout( # noqa: C901
|
||||||
session.delete(shout_topic_to_remove)
|
session.delete(shout_topic_to_remove)
|
||||||
|
|
||||||
# Replace datetime with Unix timestamp
|
# Replace datetime with Unix timestamp
|
||||||
shout_input['updated_at'] = current_time # Set updated_at as Unix timestamp
|
shout_input[
|
||||||
|
'updated_at'
|
||||||
|
] = current_time # Set updated_at as Unix timestamp
|
||||||
Shout.update(shout, shout_input)
|
Shout.update(shout, shout_input)
|
||||||
session.add(shout)
|
session.add(shout)
|
||||||
|
|
||||||
|
@ -157,10 +175,16 @@ async def update_shout( # noqa: C901
|
||||||
if 'main_topic' in shout_input:
|
if 'main_topic' in shout_input:
|
||||||
old_main_topic = (
|
old_main_topic = (
|
||||||
session.query(ShoutTopic)
|
session.query(ShoutTopic)
|
||||||
.filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.main == True))
|
.filter(
|
||||||
|
and_(ShoutTopic.shout == shout.id, ShoutTopic.main == True)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
main_topic = (
|
||||||
|
session.query(Topic)
|
||||||
|
.filter(Topic.slug == shout_input['main_topic'])
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
main_topic = session.query(Topic).filter(Topic.slug == shout_input['main_topic']).first()
|
|
||||||
if isinstance(main_topic, Topic):
|
if isinstance(main_topic, Topic):
|
||||||
new_main_topic = (
|
new_main_topic = (
|
||||||
session.query(ShoutTopic)
|
session.query(ShoutTopic)
|
||||||
|
@ -187,9 +211,12 @@ async def update_shout( # noqa: C901
|
||||||
|
|
||||||
if not publish:
|
if not publish:
|
||||||
await notify_shout(shout_dict, 'update')
|
await notify_shout(shout_dict, 'update')
|
||||||
if shout.visibility is ShoutVisibility.COMMUNITY.value or shout.visibility is ShoutVisibility.PUBLIC.value:
|
if (
|
||||||
|
shout.visibility is ShoutVisibility.COMMUNITY.value
|
||||||
|
or shout.visibility is ShoutVisibility.PUBLIC.value
|
||||||
|
):
|
||||||
# search service indexing
|
# search service indexing
|
||||||
search.index_post(shout)
|
search_service.index(shout)
|
||||||
|
|
||||||
return {'shout': shout_dict}
|
return {'shout': shout_dict}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,6 @@ class SearchService:
|
||||||
|
|
||||||
# Only initialize the instance if it's not already initialized
|
# Only initialize the instance if it's not already initialized
|
||||||
if not self.initialized_flag.value and ELASTIC_HOST:
|
if not self.initialized_flag.value and ELASTIC_HOST:
|
||||||
logger.info(' инициализация клиента OpenSearch.org')
|
|
||||||
try:
|
try:
|
||||||
self.client = OpenSearch(
|
self.client = OpenSearch(
|
||||||
hosts=[{'host': ELASTIC_HOST, 'port': ELASTIC_PORT}],
|
hosts=[{'host': ELASTIC_HOST, 'port': ELASTIC_PORT}],
|
||||||
|
@ -48,15 +47,12 @@ class SearchService:
|
||||||
ssl_show_warn=False,
|
ssl_show_warn=False,
|
||||||
# ca_certs = ca_certs_path
|
# ca_certs = ca_certs_path
|
||||||
)
|
)
|
||||||
|
logger.info(' Клиент OpenSearch.org подключен')
|
||||||
|
self.check_index()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error(exc)
|
logger.error(exc)
|
||||||
self.client = None
|
self.client = None
|
||||||
|
|
||||||
self.check_index()
|
|
||||||
else:
|
|
||||||
self.disabled = True
|
|
||||||
|
|
||||||
def info(self):
|
def info(self):
|
||||||
if self.client:
|
if self.client:
|
||||||
logger.info(f' {self.client}')
|
logger.info(f' {self.client}')
|
||||||
|
@ -96,8 +92,8 @@ class SearchService:
|
||||||
'mappings': {
|
'mappings': {
|
||||||
'properties': {
|
'properties': {
|
||||||
'body': {'type': 'text', 'analyzer': 'ru'},
|
'body': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'text': {'type': 'text'},
|
'title': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'author': {'type': 'text'},
|
# 'author': {'type': 'text'},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -116,8 +112,8 @@ class SearchService:
|
||||||
mapping = {
|
mapping = {
|
||||||
'properties': {
|
'properties': {
|
||||||
'body': {'type': 'text', 'analyzer': 'ru'},
|
'body': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'text': {'type': 'text'},
|
'title': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'author': {'type': 'text'},
|
# 'author': {'type': 'text'},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.client:
|
if self.client:
|
||||||
|
@ -135,8 +131,8 @@ class SearchService:
|
||||||
expected_mapping = {
|
expected_mapping = {
|
||||||
'properties': {
|
'properties': {
|
||||||
'body': {'type': 'text', 'analyzer': 'ru'},
|
'body': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'text': {'type': 'text'},
|
'title': {'type': 'text', 'analyzer': 'ru'},
|
||||||
'author': {'type': 'text'},
|
# 'author': {'type': 'text'},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if mapping != expected_mapping:
|
if mapping != expected_mapping:
|
||||||
|
@ -150,13 +146,13 @@ class SearchService:
|
||||||
self.delete_index()
|
self.delete_index()
|
||||||
self.check_index()
|
self.check_index()
|
||||||
|
|
||||||
def index_post(self, shout):
|
def index(self, shout):
|
||||||
if self.client:
|
if self.client:
|
||||||
id_ = str(shout.id)
|
id_ = str(shout.id)
|
||||||
logger.debug(f' Индексируем пост {id_}')
|
logger.debug(f' Индексируем пост {id_}')
|
||||||
self.client.index(index=self.index_name, id=id_, body=shout)
|
self.client.index(index=self.index_name, id=id_, body=shout)
|
||||||
|
|
||||||
def search_post(self, query, limit, offset):
|
def search(self, query, limit, offset):
|
||||||
logger.debug(f'query: {query}')
|
logger.debug(f'query: {query}')
|
||||||
search_body = {
|
search_body = {
|
||||||
'query': {'match': {'_all': query}},
|
'query': {'match': {'_all': query}},
|
||||||
|
@ -177,7 +173,7 @@ class SearchService:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
search = SearchService()
|
search_service = SearchService()
|
||||||
|
|
||||||
|
|
||||||
async def search_text(text: str, limit: int = 50, offset: int = 0):
|
async def search_text(text: str, limit: int = 50, offset: int = 0):
|
||||||
|
@ -185,9 +181,9 @@ async def search_text(text: str, limit: int = 50, offset: int = 0):
|
||||||
try:
|
try:
|
||||||
# Use a key with a prefix to differentiate search results from other Redis data
|
# Use a key with a prefix to differentiate search results from other Redis data
|
||||||
redis_key = f'search:{text}'
|
redis_key = f'search:{text}'
|
||||||
if not search.client:
|
if not search_service.client:
|
||||||
# Use OpenSearchService.search_post method
|
# Use OpenSearchService.search_post method
|
||||||
payload = search.search_post(text, limit, offset)
|
payload = search_service.search(text, limit, offset)
|
||||||
# Use Redis as cache with TTL
|
# Use Redis as cache with TTL
|
||||||
await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(payload))
|
await redis.execute('SETEX', redis_key, REDIS_TTL, json.dumps(payload))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user