inner-search
All checks were successful
Deploy to core / deploy (push) Successful in 1m39s

This commit is contained in:
Untone 2024-01-29 03:27:30 +03:00
parent 35f7a35f27
commit 2c2932caeb
6 changed files with 166 additions and 57 deletions

View File

@ -16,6 +16,7 @@ from starlette.routing import Route
from resolvers.webhook import WebhookEndpoint from resolvers.webhook import WebhookEndpoint
from services.rediscache import redis from services.rediscache import redis
from services.schema import resolvers from services.schema import resolvers
from services.search import SearchService
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN
@ -33,6 +34,9 @@ async def start_up():
# start viewed service # start viewed service
await ViewedStorage.init() await ViewedStorage.init()
# start search service
await SearchService.init()
if MODE == 'development': if MODE == 'development':
# pid file management # pid file management
if not exists(DEV_SERVER_PID_FILE_NAME): if not exists(DEV_SERVER_PID_FILE_NAME):
@ -62,5 +66,8 @@ async def shutdown():
await redis.disconnect() await redis.disconnect()
routes = [Route('/', GraphQL(schema, debug=True)), Route('/new-author', WebhookEndpoint)] routes = [
Route('/', GraphQL(schema, debug=True)),
Route('/new-author', WebhookEndpoint),
]
app = Starlette(routes=routes, debug=True, on_startup=[start_up], on_shutdown=[shutdown]) app = Starlette(routes=routes, debug=True, on_startup=[start_up], on_shutdown=[shutdown])

View File

@ -19,6 +19,7 @@ aiohttp = "^3.9.1"
pre-commit = "^3.6.0" pre-commit = "^3.6.0"
granian = "^1.0.1" granian = "^1.0.1"
google-analytics-data = "^0.18.3" google-analytics-data = "^0.18.3"
elasticsearch = "^8.12.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
setuptools = "^69.0.2" setuptools = "^69.0.2"

View File

@ -11,6 +11,7 @@ from services.auth import login_required
from services.db import local_session from services.db import local_session
from services.notify import notify_shout from services.notify import notify_shout
from services.schema import mutation, query from services.schema import mutation, query
from services.search import SearchService
@query.field('get_shouts_drafts') @query.field('get_shouts_drafts')
@ -79,7 +80,11 @@ async def create_shout(_, info, inp):
reactions_follow(author.id, shout.id, True) reactions_follow(author.id, shout.id, True)
# notifier
await notify_shout(shout_dict, 'create') await notify_shout(shout_dict, 'create')
# search service indexing
SearchService.elastic.index_post(shout)
return {'shout': shout_dict} return {'shout': shout_dict}
@ -186,6 +191,9 @@ async def update_shout( # noqa: C901
if not publish: if not publish:
await notify_shout(shout_dict, 'update') await notify_shout(shout_dict, 'update')
# search service indexing
SearchService.elastic.index_post(shout)
return {'shout': shout_dict} return {'shout': shout_dict}

View File

@ -82,9 +82,7 @@ async def get_shout(_, _info, slug=None, shout_id=None):
'rating': int(likes_stat or 0) - int(dislikes_stat or 0), 'rating': int(likes_stat or 0) - int(dislikes_stat or 0),
} }
for author_caption in ( for author_caption in session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug):
session.query(ShoutAuthor).join(Shout).where(Shout.slug == slug)
):
for author in shout.authors: for author in shout.authors:
if author.id == author_caption.author: if author.id == author_caption.author:
author.caption = author_caption.caption author.caption = author_caption.caption
@ -105,9 +103,7 @@ async def get_shout(_, _info, slug=None, shout_id=None):
shout.main_topic = main_topic[0] shout.main_topic = main_topic[0]
return shout return shout
except Exception: except Exception:
raise HTTPException( raise HTTPException(status_code=404, detail=f'shout {slug or shout_id} not found')
status_code=404, detail=f'shout {slug or shout_id} not found'
)
@query.field('load_shouts_by') @query.field('load_shouts_by')
@ -153,9 +149,7 @@ async def load_shouts_by(_, _info, options):
# order # order
order_by = options.get('order_by', Shout.published_at) order_by = options.get('order_by', Shout.published_at)
query_order_by = ( query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by)
desc(order_by) if options.get('order_by_desc', True) else asc(order_by)
)
q = q.order_by(nulls_last(query_order_by)) q = q.order_by(nulls_last(query_order_by))
# limit offset # limit offset
@ -248,20 +242,15 @@ async def load_shouts_feed(_, info, options):
with local_session() as session: with local_session() as session:
reader = session.query(Author).filter(Author.user == user_id).first() reader = session.query(Author).filter(Author.user == user_id).first()
if reader: if reader:
reader_followed_authors = select(AuthorFollower.author).where( reader_followed_authors = select(AuthorFollower.author).where(AuthorFollower.follower == reader.id)
AuthorFollower.follower == reader.id reader_followed_topics = select(TopicFollower.topic).where(TopicFollower.follower == reader.id)
)
reader_followed_topics = select(TopicFollower.topic).where(
TopicFollower.follower == reader.id
)
subquery = ( subquery = (
select(Shout.id) select(Shout.id)
.where(Shout.id == ShoutAuthor.shout) .where(Shout.id == ShoutAuthor.shout)
.where(Shout.id == ShoutTopic.shout) .where(Shout.id == ShoutTopic.shout)
.where( .where(
(ShoutAuthor.author.in_(reader_followed_authors)) (ShoutAuthor.author.in_(reader_followed_authors)) | (ShoutTopic.topic.in_(reader_followed_topics))
| (ShoutTopic.topic.in_(reader_followed_topics))
) )
) )
@ -286,24 +275,15 @@ async def load_shouts_feed(_, info, options):
order_by = options.get('order_by', Shout.published_at) order_by = options.get('order_by', Shout.published_at)
query_order_by = ( query_order_by = desc(order_by) if options.get('order_by_desc', True) else asc(order_by)
desc(order_by) if options.get('order_by_desc', True) else asc(order_by)
)
offset = options.get('offset', 0) offset = options.get('offset', 0)
limit = options.get('limit', 10) limit = options.get('limit', 10)
q = ( q = q.group_by(Shout.id).order_by(nulls_last(query_order_by)).limit(limit).offset(offset)
q.group_by(Shout.id)
.order_by(nulls_last(query_order_by))
.limit(limit)
.offset(offset)
)
# print(q.compile(compile_kwargs={"literal_binds": True})) # print(q.compile(compile_kwargs={"literal_binds": True}))
for [shout, reacted_stat, commented_stat, _last_comment] in session.execute( for [shout, reacted_stat, commented_stat, _last_comment] in session.execute(q).unique():
q
).unique():
main_topic = ( main_topic = (
session.query(Topic.slug) session.query(Topic.slug)
.join( .join(
@ -342,9 +322,7 @@ async def load_shouts_search(_, _info, text, limit=50, offset=0):
select( select(
[ [
Shout, Shout,
literal_column( literal_column(f"({results_dict.get(Shout.slug, {}).get('score', 0)})").label('score'),
f"({results_dict.get(Shout.slug, {}).get('score', 0)})"
).label('score'),
] ]
) )
.select_from(Shout) .select_from(Shout)
@ -394,9 +372,7 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
and_( and_(
Reaction.shout == Shout.id, Reaction.shout == Shout.id,
Reaction.replyTo.is_(None), Reaction.replyTo.is_(None),
Reaction.kind.in_( Reaction.kind.in_([ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]),
[ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]
),
), ),
) )
.outerjoin(Author, Author.user == bindparam('user_id')) .outerjoin(Author, Author.user == bindparam('user_id'))
@ -465,9 +441,7 @@ async def load_shouts_random_top(_, _info, options):
aliased_reaction = aliased(Reaction) aliased_reaction = aliased(Reaction)
subquery = ( subquery = select(Shout.id).outerjoin(aliased_reaction).where(Shout.deleted_at.is_(None))
select(Shout.id).outerjoin(aliased_reaction).where(Shout.deleted_at.is_(None))
)
subquery = apply_filters(subquery, options.get('filters', {})) subquery = apply_filters(subquery, options.get('filters', {}))
subquery = subquery.group_by(Shout.id).order_by( subquery = subquery.group_by(Shout.id).order_by(

View File

@ -1,42 +1,161 @@
import asyncio import asyncio
import json import json
import logging import logging
import os
from typing import List from typing import List
import aiohttp from elasticsearch import Elasticsearch
from orm.shout import Shout # Adjust the import as needed from orm.shout import Shout # Adjust the import as needed
from services.rediscache import redis # Adjust the import as needed from services.rediscache import redis # Adjust the import as needed
logger = logging.getLogger('[services.search] ')
logger.setLevel(logging.DEBUG)
ELASTIC_HOST = os.environ.get('ELASTIC_HOST', 'localhost').replace('https://', '').replace('http://', '')
ELASTIC_USER = os.environ.get('ELASTIC_USER', '')
ELASTIC_PASSWORD = os.environ.get('ELASTIC_PASSWORD', '')
ELASTIC_PORT = os.environ.get('ELASTIC_PORT', 9200)
ELASTIC_AUTH = f'{ELASTIC_USER}:{ELASTIC_PASSWORD}' if ELASTIC_USER else ''
ELASTIC_URL = f'https://{ELASTIC_AUTH}@{ELASTIC_HOST}:{ELASTIC_PORT}'
class OpenSearchService:
def __init__(self, index_name, delete_index_on_startup):
self.index_name = index_name
self.delete_index_on_startup = delete_index_on_startup
self.elasticsearch_client = Elasticsearch(f'{ELASTIC_URL}')
if self.delete_index_on_startup:
self.delete_index()
self.check_index()
def delete_index(self):
self.elasticsearch_client.indices.delete(index=self.index_name, ignore_unavailable=True)
def create_index(self):
index_settings = {
'settings': {
'index': {
'number_of_shards': 1,
'auto_expand_replicas': '0-all',
},
'analysis': {
'analyzer': {
'ru': {
'tokenizer': 'standard',
'filter': ['lowercase', 'ru_stop', 'ru_stemmer'],
}
},
'filter': {
'ru_stemmer': {
'type': 'stemmer',
'language': 'russian',
},
'ru_stop': {
'type': 'stop',
'stopwords': '_russian_',
},
},
},
},
'mappings': {
'properties': {
'body': {
'type': 'text',
'analyzer': 'ru',
},
'text': {'type': 'text'},
'author': {'type': 'text'},
}
},
}
self.elasticsearch_client.indices.create(index=self.index_name, body=index_settings)
self.elasticsearch_client.indices.close(index=self.index_name)
self.elasticsearch_client.indices.open(index=self.index_name)
def put_mapping(self):
mapping = {
'properties': {
'body': {
'type': 'text',
'analyzer': 'ru',
},
'text': {'type': 'text'},
'author': {'type': 'text'},
}
}
self.elasticsearch_client.indices.put_mapping(index=self.index_name, body=mapping)
def check_index(self):
if not self.elasticsearch_client.indices.exists(index=self.index_name):
logger.debug(f'Creating {self.index_name} index')
self.create_index()
self.put_mapping()
def index_post(self, shout):
id_ = str(shout.id)
logger.debug(f'Indexing post id {id_}')
self.elasticsearch_client.index(index=self.index_name, id=id_, body=shout)
def search_post(self, query, limit, offset):
logger.debug(f'Search query = {query}, limit = {limit}')
search_body = {
'query': {
'match': {
'_all': query,
}
}
}
search_response = self.elasticsearch_client.search(
index=self.index_name, body=search_body, size=limit, from_=offset
)
hits = search_response['hits']['hits']
return [
{
**hit['_source'],
'score': hit['_score'],
}
for hit in hits
]
class SearchService: class SearchService:
lock = asyncio.Lock() lock = asyncio.Lock()
elastic = None
@staticmethod @staticmethod
async def init(session): async def init():
async with SearchService.lock: self = SearchService
logging.info('[services.search] Initializing SearchService') async with self.lock:
logging.info('Initializing SearchService')
try:
self.elastic = OpenSearchService('shouts_index', False)
except Exception as exc:
logger.error(exc)
@staticmethod @staticmethod
async def search(text: str, limit: int = 50, offset: int = 0) -> List[Shout]: async def search(text: str, limit: int = 50, offset: int = 0) -> List[Shout]:
payload = [] payload = []
self = SearchService
try: try:
# TODO: add ttl for redis cached search results # TODO: add ttl for redis cached search results
cached = await redis.execute('GET', text) cached = await redis.execute('GET', text)
if not cached: if not cached:
async with SearchService.lock: async with self.lock:
# Use aiohttp to send a request to ElasticSearch # Use OpenSearchService.search_post method
async with aiohttp.ClientSession() as session: payload = await self.elastic.search_post(text, limit, offset)
search_url = f'https://search.discours.io/search?q={text}' # Use Redis as cache
async with session.get(search_url) as response: await redis.execute('SET', text, json.dumps(payload))
if response.status == 200:
payload = await response.json()
await redis.execute('SET', text, json.dumps(payload)) # use redis as cache
else:
logging.error(f'[services.search] response: {response.status} {await response.text()}')
elif isinstance(cached, str): elif isinstance(cached, str):
payload = json.loads(cached) payload = json.loads(cached)
except Exception as e: except Exception as e:
logging.error(f'[services.search] Error during search: {e}') logging.error(f'Error during search: {e}')
return payload
return payload[offset : offset + limit]

View File

@ -49,7 +49,7 @@ class ViewedStorage:
self = ViewedStorage self = ViewedStorage
async with self.lock: async with self.lock:
os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', GOOGLE_KEYFILE_PATH) os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH: if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
# Using a default constructor instructs the client to use the credentials # Using a default constructor instructs the client to use the credentials
# specified in GOOGLE_APPLICATION_CREDENTIALS environment variable. # specified in GOOGLE_APPLICATION_CREDENTIALS environment variable.
self.analytics_client = BetaAnalyticsDataClient() self.analytics_client = BetaAnalyticsDataClient()