feat(search.py): separate indexing of Shout Title, shout Body and Authors
All checks were successful
Deploy on push / deploy (push) Successful in 39s

This commit is contained in:
Stepan Vladovskiy 2025-04-20 19:22:08 -03:00
parent e382cc1ea5
commit 4d965fb27b
3 changed files with 340 additions and 211 deletions

View File

@ -71,6 +71,34 @@ class ShoutAuthor(Base):
class Shout(Base):
"""
Публикация в системе.
Attributes:
body (str)
slug (str)
cover (str) : "Cover image url"
cover_caption (str) : "Cover image alt caption"
lead (str)
title (str)
subtitle (str)
layout (str)
media (dict)
authors (list[Author])
topics (list[Topic])
reactions (list[Reaction])
lang (str)
version_of (int)
oid (str)
seo (str) : JSON
draft (int)
created_at (int)
updated_at (int)
published_at (int)
featured_at (int)
deleted_at (int)
created_by (int)
updated_by (int)
deleted_by (int)
community (int)
"""
__tablename__ = "shout"

View File

@ -19,7 +19,7 @@ from sqlalchemy import (
inspect,
text,
)
from sqlalchemy.orm import Session, configure_mappers, declarative_base
from sqlalchemy.orm import Session, configure_mappers, declarative_base, joinedload
from sqlalchemy.sql.schema import Table
from settings import DB_URL
@ -260,8 +260,11 @@ def get_json_builder():
# Используем их в коде
json_builder, json_array_builder, json_cast = get_json_builder()
# Fetch all shouts, with authors preloaded
# This function is used for search indexing
async def fetch_all_shouts(session=None):
"""Fetch all published shouts for search indexing"""
"""Fetch all published shouts for search indexing with authors preloaded"""
from orm.shout import Shout
close_session = False
@ -270,8 +273,10 @@ async def fetch_all_shouts(session=None):
close_session = True
try:
# Fetch only published and non-deleted shouts
query = session.query(Shout).filter(
# Fetch only published and non-deleted shouts with authors preloaded
query = session.query(Shout).options(
joinedload(Shout.authors)
).filter(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None)
)

View File

@ -216,8 +216,9 @@ class SearchService:
"""Check if service is available"""
return self.available
async def verify_docs(self, doc_ids):
"""Verify which documents exist in the search index"""
"""Verify which documents exist in the search index across all content types"""
if not self.available:
return {"status": "disabled"}
@ -231,14 +232,36 @@ class SearchService:
response.raise_for_status()
result = response.json()
# Log summary of verification results
missing_count = len(result.get("missing", []))
logger.info(f"Document verification complete: {missing_count} missing out of {len(doc_ids)} total")
# Process the more detailed response format
bodies_missing = set(result.get("bodies", {}).get("missing", []))
titles_missing = set(result.get("titles", {}).get("missing", []))
return result
# Combine missing IDs from both bodies and titles
# A document is considered missing if it's missing from either index
all_missing = list(bodies_missing.union(titles_missing))
# Log summary of verification results
bodies_missing_count = len(bodies_missing)
titles_missing_count = len(titles_missing)
total_missing_count = len(all_missing)
logger.info(f"Document verification complete: {bodies_missing_count} bodies missing, {titles_missing_count} titles missing")
logger.info(f"Total unique missing documents: {total_missing_count} out of {len(doc_ids)} total")
# Return in a backwards-compatible format plus the detailed breakdown
return {
"missing": all_missing,
"details": {
"bodies_missing": list(bodies_missing),
"titles_missing": list(titles_missing),
"bodies_missing_count": bodies_missing_count,
"titles_missing_count": titles_missing_count
}
}
except Exception as e:
logger.error(f"Document verification error: {e}")
return {"status": "error", "message": str(e)}
def index(self, shout):
"""Index a single document"""
@ -249,68 +272,147 @@ class SearchService:
asyncio.create_task(self.perform_index(shout))
async def perform_index(self, shout):
"""Actually perform the indexing operation"""
"""Index a single document across multiple endpoints"""
if not self.available:
return
try:
# Combine all text fields
text = " ".join(filter(None, [
shout.title or "",
shout.subtitle or "",
shout.lead or "",
shout.body or "",
shout.media or ""
]))
logger.info(f"Indexing document {shout.id} to individual endpoints")
indexing_tasks = []
if not text.strip():
logger.warning(f"No text content to index for shout {shout.id}")
return
# 1. Index title if available
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
title_doc = {
"id": str(shout.id),
"title": shout.title.strip()
}
indexing_tasks.append(
self.index_client.post("/index-title", json=title_doc)
)
# 2. Index body content (subtitle, lead, body)
body_text_parts = []
for field_name in ['subtitle', 'lead', 'body']:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
body_text_parts.append(field_value.strip())
# Process media content if available
media = getattr(shout, 'media', None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if 'title' in media_json:
body_text_parts.append(media_json['title'])
if 'body' in media_json:
body_text_parts.append(media_json['body'])
except json.JSONDecodeError:
body_text_parts.append(media)
elif isinstance(media, dict):
if 'title' in media:
body_text_parts.append(media['title'])
if 'body' in media:
body_text_parts.append(media['body'])
if body_text_parts:
body_text = " ".join(body_text_parts)
# Truncate if too long
MAX_TEXT_LENGTH = 4000
if len(body_text) > MAX_TEXT_LENGTH:
body_text = body_text[:MAX_TEXT_LENGTH]
logger.info(f"Indexing document: ID={shout.id}, Text length={len(text)}")
body_doc = {
"id": str(shout.id),
"body": body_text
}
indexing_tasks.append(
self.index_client.post("/index-body", json=body_doc)
)
# Send to txtai service
response = await self.client.post(
"/index",
json={"id": str(shout.id), "text": text}
)
response.raise_for_status()
result = response.json()
logger.info(f"Post {shout.id} successfully indexed: {result}")
# 3. Index authors
authors = getattr(shout, 'authors', [])
for author in authors:
author_id = str(getattr(author, 'id', 0))
if not author_id or author_id == '0':
continue
name = getattr(author, 'name', '')
# Combine bio and about fields
bio_parts = []
bio = getattr(author, 'bio', '')
if bio and isinstance(bio, str):
bio_parts.append(bio.strip())
about = getattr(author, 'about', '')
if about and isinstance(about, str):
bio_parts.append(about.strip())
combined_bio = " ".join(bio_parts)
if name:
author_doc = {
"id": author_id,
"name": name,
"bio": combined_bio
}
indexing_tasks.append(
self.index_client.post("/index-author", json=author_doc)
)
# Run all indexing tasks in parallel
if indexing_tasks:
responses = await asyncio.gather(*indexing_tasks, return_exceptions=True)
# Check for errors in responses
for i, response in enumerate(responses):
if isinstance(response, Exception):
logger.error(f"Error in indexing task {i}: {response}")
elif hasattr(response, 'status_code') and response.status_code >= 400:
logger.error(f"Error response in indexing task {i}: {response.status_code}, {await response.text()}")
logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints")
else:
logger.warning(f"No content to index for shout {shout.id}")
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
async def bulk_index(self, shouts):
"""Index multiple documents at once with adaptive batch sizing"""
"""Index multiple documents across three separate endpoints"""
if not self.available or not shouts:
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
return
start_time = time.time()
logger.info(f"Starting bulk indexing of {len(shouts)} documents")
logger.info(f"Starting multi-endpoint bulk indexing of {len(shouts)} documents")
MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request
max_batch_size = MAX_BATCH_SIZE
total_indexed = 0
# Prepare documents for different endpoints
title_docs = []
body_docs = []
author_docs = {} # Use dict to prevent duplicate authors
total_skipped = 0
total_truncated = 0
total_retries = 0
# Group documents by size to process smaller documents in larger batches
small_docs = []
medium_docs = []
large_docs = []
# First pass: prepare all documents and categorize by size
for shout in shouts:
try:
text_fields = []
for field_name in ['title', 'subtitle', 'lead', 'body']:
# 1. Process title documents
if hasattr(shout, 'title') and shout.title and isinstance(shout.title, str):
title_docs.append({
"id": str(shout.id),
"title": shout.title.strip()
})
# 2. Process body documents (subtitle, lead, body)
body_text_parts = []
for field_name in ['subtitle', 'lead', 'body']:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
text_fields.append(field_value.strip())
body_text_parts.append(field_value.strip())
# Media field processing remains the same
# Process media content if available
media = getattr(shout, 'media', None)
if media:
if isinstance(media, str):
@ -318,186 +420,180 @@ class SearchService:
media_json = json.loads(media)
if isinstance(media_json, dict):
if 'title' in media_json:
text_fields.append(media_json['title'])
body_text_parts.append(media_json['title'])
if 'body' in media_json:
text_fields.append(media_json['body'])
body_text_parts.append(media_json['body'])
except json.JSONDecodeError:
text_fields.append(media)
body_text_parts.append(media)
elif isinstance(media, dict):
if 'title' in media:
text_fields.append(media['title'])
body_text_parts.append(media['title'])
if 'body' in media:
text_fields.append(media['body'])
body_text_parts.append(media['body'])
text = " ".join(text_fields)
if not text.strip():
total_skipped += 1
continue
# Truncate text if it exceeds the maximum length
original_length = len(text)
if original_length > MAX_TEXT_LENGTH:
text = text[:MAX_TEXT_LENGTH]
total_truncated += 1
document = {
"id": str(shout.id),
"text": text
}
# Categorize by size
text_len = len(text)
if text_len > 5000:
large_docs.append(document)
elif text_len > 2000:
medium_docs.append(document)
else:
small_docs.append(document)
total_indexed += 1
# Only add body document if we have body text
if body_text_parts:
body_text = " ".join(body_text_parts)
# Truncate if too long
MAX_TEXT_LENGTH = 4000
if len(body_text) > MAX_TEXT_LENGTH:
body_text = body_text[:MAX_TEXT_LENGTH]
body_docs.append({
"id": str(shout.id),
"body": body_text
})
# 3. Process authors if available
authors = getattr(shout, 'authors', [])
for author in authors:
author_id = str(getattr(author, 'id', 0))
if not author_id or author_id == '0':
continue
# Skip if we've already processed this author
if author_id in author_docs:
continue
name = getattr(author, 'name', '')
# Combine bio and about fields
bio_parts = []
bio = getattr(author, 'bio', '')
if bio and isinstance(bio, str):
bio_parts.append(bio.strip())
about = getattr(author, 'about', '')
if about and isinstance(about, str):
bio_parts.append(about.strip())
combined_bio = " ".join(bio_parts)
# Only add if we have author data
if name:
author_docs[author_id] = {
"id": author_id,
"name": name,
"bio": combined_bio
}
except Exception as e:
logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
total_skipped += 1
# Process each category with appropriate batch sizes
logger.info(f"Documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
# Convert author dict to list
author_docs_list = list(author_docs.values())
# Process small documents (larger batches)
if small_docs:
batch_size = min(max_batch_size, 15)
await self._process_document_batches(small_docs, batch_size, "small")
# Process medium documents (medium batches)
if medium_docs:
batch_size = min(max_batch_size, 10)
await self._process_document_batches(medium_docs, batch_size, "medium")
# Process large documents (small batches)
if large_docs:
batch_size = min(max_batch_size, 3)
await self._process_document_batches(large_docs, batch_size, "large")
# Process each endpoint in parallel
indexing_tasks = [
self._index_endpoint(title_docs, "/bulk-index-titles", "title"),
self._index_endpoint(body_docs, "/bulk-index-bodies", "body"),
self._index_endpoint(author_docs_list, "/bulk-index-authors", "author")
]
await asyncio.gather(*indexing_tasks)
elapsed = time.time() - start_time
logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries")
async def _process_document_batches(self, documents, batch_size, size_category):
"""Process document batches with retry logic"""
# Check for possible database corruption before starting
db_error_count = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
batch_id = f"{size_category}-{i//batch_size + 1}"
logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents")
retry_count = 0
max_retries = 3
success = False
# Process with retries
while not success and retry_count < max_retries:
try:
logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})")
response = await self.index_client.post(
"/bulk-index",
json=batch,
timeout=120.0 # Explicit longer timeout for large batches
)
# Handle 422 validation errors - these won't be fixed by retrying
if response.status_code == 422:
error_detail = response.json()
truncated_error = self._truncate_error_detail(error_detail)
logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}")
break
# Handle 500 server errors - these might be fixed by retrying with smaller batches
elif response.status_code == 500:
db_error_count += 1
# If we've seen multiple 500s, log a critical error
if db_error_count >= 3:
logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
# Try again with exponential backoff
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter
await asyncio.sleep(wait_time)
continue
# Final retry, split the batch
elif len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
else:
# Can't split a single document
break
# Normal success case
response.raise_for_status()
success = True
db_error_count = 0 # Reset error counter on success
except Exception as e:
error_str = str(e).lower()
if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str:
db_error_count += 1
if db_error_count >= 2:
logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time)
else:
if len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
async def _process_single_batch(self, documents, batch_id):
"""Process a single batch with maximum reliability"""
max_retries = 3
retry_count = 0
logger.info(
f"Multi-endpoint indexing completed in {elapsed:.2f}s: "
f"{len(title_docs)} titles, {len(body_docs)} bodies, {len(author_docs_list)} authors, "
f"{total_skipped} shouts skipped"
)
while retry_count < max_retries:
try:
if not documents:
return
async def _index_endpoint(self, documents, endpoint, doc_type):
"""Process and index documents to a specific endpoint"""
if not documents:
logger.info(f"No {doc_type} documents to index")
return
logger.info(f"Indexing {len(documents)} {doc_type} documents")
# Categorize documents by size
small_docs, medium_docs, large_docs = self._categorize_by_size(documents, doc_type)
# Process each category with appropriate batch sizes
batch_sizes = {
"small": min(MAX_BATCH_SIZE, 15),
"medium": min(MAX_BATCH_SIZE, 10),
"large": min(MAX_BATCH_SIZE, 3)
}
for category, docs in [("small", small_docs), ("medium", medium_docs), ("large", large_docs)]:
if docs:
batch_size = batch_sizes[category]
await self._process_batches(docs, batch_size, endpoint, f"{doc_type}-{category}")
def _categorize_by_size(self, documents, doc_type):
"""Categorize documents by size for optimized batch processing"""
small_docs = []
medium_docs = []
large_docs = []
for doc in documents:
# Extract relevant text based on document type
if doc_type == "title":
text = doc.get("title", "")
elif doc_type == "body":
text = doc.get("body", "")
else: # author
# For authors, consider both name and bio length
text = doc.get("name", "") + " " + doc.get("bio", "")
text_len = len(text)
if text_len > 5000:
large_docs.append(doc)
elif text_len > 2000:
medium_docs.append(doc)
else:
small_docs.append(doc)
logger.info(f"{doc_type.capitalize()} documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
return small_docs, medium_docs, large_docs
async def _process_batches(self, documents, batch_size, endpoint, batch_prefix):
"""Process document batches with retry logic"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
batch_id = f"{batch_prefix}-{i//batch_size + 1}"
retry_count = 0
max_retries = 3
success = False
while not success and retry_count < max_retries:
try:
logger.info(f"Sending batch {batch_id} ({len(batch)} docs) to {endpoint}")
response = await self.index_client.post(
endpoint,
json=batch,
timeout=90.0
)
if response.status_code == 422:
error_detail = response.json()
logger.error(f"Validation error from search service for batch {batch_id}: {self._truncate_error_detail(error_detail)}")
break
response.raise_for_status()
success = True
logger.info(f"Successfully indexed batch {batch_id}")
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
if len(batch) > 1:
mid = len(batch) // 2
logger.warning(f"Splitting batch {batch_id} into smaller batches for retry")
await self._process_batches(batch[:mid], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-A")
await self._process_batches(batch[mid:], batch_size // 2, endpoint, f"{batch_prefix}-{i//batch_size}-B")
else:
logger.error(f"Failed to index single document in batch {batch_id} after {max_retries} attempts: {str(e)}")
break
response = await self.index_client.post(
"/bulk-index",
json=documents,
timeout=90.0
)
response.raise_for_status()
return # Success, exit the retry loop
except Exception as e:
error_str = str(e).lower()
retry_count += 1
if "dictionary changed size" in error_str or "transaction error" in error_str:
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time) # Wait for txtai to recover
continue
if retry_count >= max_retries and len(documents) > 1:
for i, doc in enumerate(documents):
try:
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
resp.raise_for_status()
except Exception as e2:
pass
return # Exit after individual processing attempt
logger.warning(f"Retrying batch {batch_id} in {wait_time:.1f}s... (attempt {retry_count+1}/{max_retries})")
await asyncio.sleep(wait_time)
def _truncate_error_detail(self, error_detail):
"""Truncate error details for logging"""
@ -632,7 +728,7 @@ async def initialize_search_index(shouts_data):
return
index_stats = info.get("index_stats", {})
indexed_doc_count = index_stats.get("document_count", 0)
indexed_doc_count = index_stats.get("total_count", 0)
index_status = await search_service.check_index_status()
if index_status.get("status") == "inconsistent":