debug: without debug logging. clean
All checks were successful
Deploy on push / deploy (push) Successful in 1m27s
All checks were successful
Deploy on push / deploy (push) Successful in 1m27s
This commit is contained in:
parent
c533241d1e
commit
106222b0e0
|
@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
"""
|
||||
shouts = []
|
||||
try:
|
||||
# logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}")
|
||||
q = q.limit(limit).offset(offset)
|
||||
|
||||
with local_session() as session:
|
||||
shouts_result = session.execute(q).all()
|
||||
# logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query")
|
||||
|
||||
if not shouts_result:
|
||||
logger.warning("No shouts found in query result")
|
||||
|
@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
shout = None
|
||||
if hasattr(row, "Shout"):
|
||||
shout = row.Shout
|
||||
# logger.debug(f"Processing shout#{shout.id} at index {idx}")
|
||||
if shout:
|
||||
shout_id = int(f"{shout.id}")
|
||||
shout_dict = shout.dict()
|
||||
|
@ -231,20 +228,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
topics = None
|
||||
if has_field(info, "topics") and hasattr(row, "topics"):
|
||||
topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
|
||||
# logger.debug(f"Shout#{shout_id} topics: {topics}")
|
||||
shout_dict["topics"] = topics
|
||||
|
||||
if has_field(info, "main_topic"):
|
||||
main_topic = None
|
||||
if hasattr(row, "main_topic"):
|
||||
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
|
||||
main_topic = (
|
||||
orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
|
||||
)
|
||||
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
|
||||
|
||||
if not main_topic and topics and len(topics) > 0:
|
||||
# logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list")
|
||||
main_topic = {
|
||||
"id": topics[0]["id"],
|
||||
"title": topics[0]["title"],
|
||||
|
@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
"is_main": True,
|
||||
}
|
||||
elif not main_topic:
|
||||
logger.debug(f"No main_topic and no topics found for shout#{shout_id}")
|
||||
main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
|
||||
shout_dict["main_topic"] = main_topic
|
||||
logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}")
|
||||
|
||||
if has_field(info, "authors") and hasattr(row, "authors"):
|
||||
shout_dict["authors"] = (
|
||||
|
@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links")
|
||||
return shouts
|
||||
|
||||
|
||||
|
|
|
@ -84,7 +84,6 @@ class SearchCache:
|
|||
if cached_data:
|
||||
all_results = json.loads(cached_data)
|
||||
logger.info(f"Retrieved search results for '{query}' from Redis")
|
||||
# Redis TTL is auto-extended when setting the key with expiry
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving search results from Redis: {e}")
|
||||
|
||||
|
@ -96,7 +95,7 @@ class SearchCache:
|
|||
|
||||
# If not found in any cache
|
||||
if all_results is None:
|
||||
logger.debug(f"Cache miss for query '{query}'")
|
||||
logger.info(f"Cache miss for query '{query}'")
|
||||
return None
|
||||
|
||||
# Return paginated subset
|
||||
|
@ -314,7 +313,6 @@ class SearchService:
|
|||
# Media field processing remains the same
|
||||
media = getattr(shout, 'media', None)
|
||||
if media:
|
||||
# Your existing media processing logic
|
||||
if isinstance(media, str):
|
||||
try:
|
||||
media_json = json.loads(media)
|
||||
|
@ -334,7 +332,6 @@ class SearchService:
|
|||
text = " ".join(text_fields)
|
||||
|
||||
if not text.strip():
|
||||
logger.debug(f"Skipping shout {shout.id}: no text content")
|
||||
total_skipped += 1
|
||||
continue
|
||||
|
||||
|
@ -342,7 +339,6 @@ class SearchService:
|
|||
original_length = len(text)
|
||||
if original_length > MAX_TEXT_LENGTH:
|
||||
text = text[:MAX_TEXT_LENGTH]
|
||||
logger.info(f"Truncated document {shout.id} from {original_length} to {MAX_TEXT_LENGTH} chars")
|
||||
total_truncated += 1
|
||||
|
||||
document = {
|
||||
|
@ -403,10 +399,6 @@ class SearchService:
|
|||
# Process with retries
|
||||
while not success and retry_count < max_retries:
|
||||
try:
|
||||
if batch:
|
||||
sample = batch[0]
|
||||
logger.info(f"Sample document in batch {batch_id}: id={sample['id']}, text_length={len(sample['text'])}")
|
||||
|
||||
logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})")
|
||||
response = await self.index_client.post(
|
||||
"/bulk-index",
|
||||
|
@ -434,31 +426,25 @@ class SearchService:
|
|||
if retry_count < max_retries - 1:
|
||||
retry_count += 1
|
||||
wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter
|
||||
logger.warning(f"Server error for batch {batch_id}, retrying in {wait_time:.1f}s (attempt {retry_count+1}/{max_retries})")
|
||||
await asyncio.sleep(wait_time)
|
||||
continue
|
||||
|
||||
# Final retry, split the batch
|
||||
elif len(batch) > 1:
|
||||
logger.warning(f"Splitting batch {batch_id} after repeated failures")
|
||||
mid = len(batch) // 2
|
||||
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
|
||||
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
|
||||
break
|
||||
else:
|
||||
# Can't split a single document
|
||||
logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts")
|
||||
break
|
||||
|
||||
# Normal success case
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Batch {batch_id} indexed successfully: {result}")
|
||||
success = True
|
||||
db_error_count = 0 # Reset error counter on success
|
||||
|
||||
except Exception as e:
|
||||
# Check if it looks like a database corruption error
|
||||
error_str = str(e).lower()
|
||||
if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str:
|
||||
db_error_count += 1
|
||||
|
@ -469,17 +455,12 @@ class SearchService:
|
|||
if retry_count < max_retries - 1:
|
||||
retry_count += 1
|
||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
||||
logger.warning(f"Error for batch {batch_id}, retrying in {wait_time:.1f}s: {str(e)[:200]}")
|
||||
await asyncio.sleep(wait_time)
|
||||
else:
|
||||
# Last resort - try to split the batch
|
||||
if len(batch) > 1:
|
||||
logger.warning(f"Splitting batch {batch_id} after exception: {str(e)[:200]}")
|
||||
mid = len(batch) // 2
|
||||
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
|
||||
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
|
||||
else:
|
||||
logger.error(f"Failed to index document {batch[0]['id']} after {max_retries} attempts: {e}")
|
||||
break
|
||||
|
||||
async def _process_single_batch(self, documents, batch_id):
|
||||
|
@ -492,41 +473,30 @@ class SearchService:
|
|||
if not documents:
|
||||
return
|
||||
|
||||
logger.info(f"Processing sub-batch {batch_id} with {len(documents)} documents")
|
||||
response = await self.index_client.post(
|
||||
"/bulk-index",
|
||||
json=documents,
|
||||
timeout=90.0
|
||||
)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Sub-batch {batch_id} indexed successfully: {result}")
|
||||
return # Success, exit the retry loop
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
retry_count += 1
|
||||
|
||||
# Check if it's a transient error that txtai might recover from internally
|
||||
if "dictionary changed size" in error_str or "transaction error" in error_str:
|
||||
wait_time = (2 ** retry_count) + (random.random() * 0.5)
|
||||
logger.warning(f"Transient txtai error in sub-batch {batch_id}, waiting {wait_time:.1f}s for recovery: {str(e)[:200]}")
|
||||
await asyncio.sleep(wait_time) # Wait for txtai to recover
|
||||
continue # Try again
|
||||
continue
|
||||
|
||||
# For other errors or final retry failure
|
||||
logger.error(f"Error indexing sub-batch {batch_id} (attempt {retry_count}/{max_retries}): {str(e)[:200]}")
|
||||
|
||||
# Only try one-by-one on the final retry
|
||||
if retry_count >= max_retries and len(documents) > 1:
|
||||
logger.info(f"Processing documents in sub-batch {batch_id} individually")
|
||||
for i, doc in enumerate(documents):
|
||||
try:
|
||||
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Indexed document {doc['id']} individually")
|
||||
except Exception as e2:
|
||||
logger.error(f"Failed to index document {doc['id']} individually: {str(e2)[:100]}")
|
||||
pass
|
||||
return # Exit after individual processing attempt
|
||||
|
||||
def _truncate_error_detail(self, error_detail):
|
||||
|
@ -537,13 +507,11 @@ class SearchService:
|
|||
for i, item in enumerate(truncated_detail['detail']):
|
||||
if isinstance(item, dict) and 'input' in item:
|
||||
if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']):
|
||||
# Check for documents list
|
||||
if 'documents' in item['input'] and isinstance(item['input']['documents'], list):
|
||||
for j, doc in enumerate(item['input']['documents']):
|
||||
if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100:
|
||||
item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]"
|
||||
|
||||
# Check for direct text field
|
||||
if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100:
|
||||
item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]"
|
||||
|
||||
|
@ -552,11 +520,9 @@ class SearchService:
|
|||
async def search(self, text, limit, offset):
|
||||
"""Search documents"""
|
||||
if not self.available:
|
||||
logger.warning("Search not available")
|
||||
return []
|
||||
|
||||
if not isinstance(text, str) or not text.strip():
|
||||
logger.warning(f"Invalid search text: {text}")
|
||||
return []
|
||||
|
||||
logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})")
|
||||
|
@ -567,7 +533,6 @@ class SearchService:
|
|||
if has_cache:
|
||||
cached_results = await self.cache.get(text, limit, offset)
|
||||
if cached_results is not None:
|
||||
logger.info(f"Serving search results for '{text}' from cache (offset={offset}, limit={limit})")
|
||||
return cached_results
|
||||
|
||||
# Not in cache or cache disabled, perform new search
|
||||
|
@ -575,61 +540,40 @@ class SearchService:
|
|||
search_limit = limit
|
||||
search_offset = offset
|
||||
|
||||
# Always prefetch full results when caching is enabled
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
search_limit = SEARCH_PREFETCH_SIZE # Always fetch a large set
|
||||
search_offset = 0 # Always start from beginning
|
||||
search_limit = SEARCH_PREFETCH_SIZE
|
||||
search_offset = 0
|
||||
else:
|
||||
search_limit = limit
|
||||
search_offset = offset
|
||||
|
||||
logger.info(f"Sending search request: text='{text}', limit={search_limit}, offset={search_offset}")
|
||||
response = await self.client.post(
|
||||
"/search",
|
||||
json={"text": text, "limit": search_limit, "offset": search_offset}
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
# logger.info(f"Raw search response: {response.text}")
|
||||
result = response.json()
|
||||
# logger.info(f"Parsed search response: {result}")
|
||||
|
||||
formatted_results = result.get("results", [])
|
||||
|
||||
# Filter out non-numeric IDs to prevent database errors
|
||||
valid_results = []
|
||||
for item in formatted_results:
|
||||
doc_id = item.get("id")
|
||||
if doc_id and doc_id.isdigit():
|
||||
valid_results.append(item)
|
||||
else:
|
||||
logger.warning(f"Filtered out non-numeric document ID: {doc_id}")
|
||||
|
||||
if len(valid_results) != len(formatted_results):
|
||||
logger.info(f"Filtered {len(formatted_results) - len(valid_results)} results with non-numeric IDs")
|
||||
formatted_results = valid_results
|
||||
|
||||
# Filter out low-score results
|
||||
if SEARCH_MIN_SCORE > 0:
|
||||
initial_count = len(formatted_results)
|
||||
formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
|
||||
if len(formatted_results) != initial_count:
|
||||
logger.info(f"Filtered {initial_count - len(formatted_results)} results with score < {SEARCH_MIN_SCORE}")
|
||||
|
||||
logger.info(f"Search for '{text}' returned {len(formatted_results)} valid results")
|
||||
|
||||
if formatted_results:
|
||||
logger.info(f"Sample result: {formatted_results[0]}")
|
||||
else:
|
||||
logger.warning(f"No results found for '{text}'")
|
||||
|
||||
|
||||
if SEARCH_CACHE_ENABLED:
|
||||
logger.info(f"Storing {len(formatted_results)} results in cache for query '{text}'")
|
||||
await self.cache.store(text, formatted_results) # Return the proper page slice from the full results stored in cache end_idx = offset + limit
|
||||
await self.cache.store(text, formatted_results)
|
||||
end_idx = offset + limit
|
||||
page_results = formatted_results[offset:end_idx]
|
||||
logger.info(f"Returning results from {offset} to {end_idx} (of {len(formatted_results)} total)")
|
||||
return page_results
|
||||
|
||||
return formatted_results
|
||||
|
@ -646,9 +590,7 @@ class SearchService:
|
|||
response = await self.client.get("/index-status")
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"Index status check: {result['status']}, {result['documents_count']} documents")
|
||||
|
||||
# Log warnings for any inconsistencies
|
||||
if result.get("consistency", {}).get("status") != "ok":
|
||||
null_count = result.get("consistency", {}).get("null_embeddings_count", 0)
|
||||
if null_count > 0:
|
||||
|
@ -674,126 +616,69 @@ async def get_search_count(text: str):
|
|||
if search_service.available and SEARCH_CACHE_ENABLED:
|
||||
if await search_service.cache.has_query(text):
|
||||
return await search_service.cache.get_total_count(text)
|
||||
# If not cached, we'll need to perform the full search once
|
||||
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
|
||||
return len(results)
|
||||
|
||||
async def initialize_search_index(shouts_data):
|
||||
"""Initialize search index with existing data during application startup"""
|
||||
if not SEARCH_ENABLED:
|
||||
logger.info("Search indexing skipped (SEARCH_ENABLED=False)")
|
||||
return
|
||||
|
||||
if not shouts_data:
|
||||
logger.warning("No shouts data provided for search indexing")
|
||||
return
|
||||
|
||||
logger.info(f"Checking search index status for {len(shouts_data)} documents")
|
||||
|
||||
# Get the current index info
|
||||
info = await search_service.info()
|
||||
if info.get("status") in ["error", "unavailable", "disabled"]:
|
||||
logger.error(f"Cannot initialize search index: {info}")
|
||||
return
|
||||
|
||||
# Check if index has approximately right number of documents
|
||||
index_stats = info.get("index_stats", {})
|
||||
indexed_doc_count = index_stats.get("document_count", 0)
|
||||
|
||||
# Add a more detailed status check
|
||||
index_status = await search_service.check_index_status()
|
||||
if index_status.get("status") == "healthy":
|
||||
logger.info("Index status check passed")
|
||||
elif index_status.get("status") == "inconsistent":
|
||||
logger.warning("Index status check found inconsistencies")
|
||||
|
||||
# Get documents with null embeddings
|
||||
if index_status.get("status") == "inconsistent":
|
||||
problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", [])
|
||||
|
||||
if problem_ids:
|
||||
logger.info(f"Repairing {len(problem_ids)} documents with NULL embeddings")
|
||||
problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids]
|
||||
if problem_docs:
|
||||
await search_service.bulk_index(problem_docs)
|
||||
|
||||
# Log database document summary
|
||||
db_ids = [str(shout.id) for shout in shouts_data]
|
||||
logger.info(f"Database contains {len(shouts_data)} documents. Sample IDs: {', '.join(db_ids[:5])}...")
|
||||
|
||||
# Calculate summary by ID range to understand the coverage
|
||||
try:
|
||||
# Parse numeric IDs where possible to analyze coverage
|
||||
numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()]
|
||||
if numeric_ids:
|
||||
min_id = min(numeric_ids)
|
||||
max_id = max(numeric_ids)
|
||||
id_range = max_id - min_id + 1
|
||||
coverage_pct = (len(numeric_ids) / id_range) * 100 if id_range > 0 else 0
|
||||
logger.info(f"ID range analysis: min_id={min_id}, max_id={max_id}, range={id_range}, "
|
||||
f"coverage={coverage_pct:.1f}% ({len(numeric_ids)}/{id_range})")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not analyze ID ranges: {e}")
|
||||
pass
|
||||
|
||||
# If counts are significantly different, do verification
|
||||
if abs(indexed_doc_count - len(shouts_data)) > 10:
|
||||
logger.info(f"Document count mismatch: {indexed_doc_count} in index vs {len(shouts_data)} in database. Verifying...")
|
||||
|
||||
# Get all document IDs from your database
|
||||
doc_ids = [str(shout.id) for shout in shouts_data]
|
||||
|
||||
# Verify which ones are missing from the index
|
||||
verification = await search_service.verify_docs(doc_ids)
|
||||
|
||||
if verification.get("status") == "error":
|
||||
logger.error(f"Document verification failed: {verification.get('message')}")
|
||||
return
|
||||
|
||||
# Index only missing documents
|
||||
missing_ids = verification.get("missing", [])
|
||||
if missing_ids:
|
||||
logger.info(f"Found {len(missing_ids)} documents missing from index. Indexing them...")
|
||||
logger.info(f"Sample missing IDs: {', '.join(missing_ids[:10])}...")
|
||||
missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids]
|
||||
await search_service.bulk_index(missing_docs)
|
||||
else:
|
||||
logger.info("All documents are already indexed.")
|
||||
else:
|
||||
logger.info(f"Search index appears to be in sync ({indexed_doc_count} documents indexed).")
|
||||
pass
|
||||
|
||||
# Optional sample verification (can be slow with large document sets)
|
||||
# Uncomment if you want to periodically check a random sample even when counts match
|
||||
"""
|
||||
sample_size = 10
|
||||
if len(db_ids) > sample_size:
|
||||
sample_ids = random.sample(db_ids, sample_size)
|
||||
logger.info(f"Performing random sample verification on {sample_size} documents...")
|
||||
verification = await search_service.verify_docs(sample_ids)
|
||||
if verification.get("missing"):
|
||||
missing_count = len(verification.get("missing", []))
|
||||
logger.warning(f"Random verification found {missing_count}/{sample_size} missing docs "
|
||||
f"despite count match. Consider full verification.")
|
||||
else:
|
||||
logger.info("Random document sample verification passed.")
|
||||
"""
|
||||
|
||||
# Verify with test query
|
||||
try:
|
||||
test_query = "test"
|
||||
logger.info(f"Verifying search index with query: '{test_query}'")
|
||||
test_results = await search_text(test_query, 5)
|
||||
|
||||
if test_results:
|
||||
logger.info(f"Search verification successful: found {len(test_results)} results")
|
||||
# Log categories covered by search results
|
||||
categories = set()
|
||||
for result in test_results:
|
||||
result_id = result.get("id")
|
||||
matching_shouts = [s for s in shouts_data if str(s.id) == result_id]
|
||||
if matching_shouts and hasattr(matching_shouts[0], 'category'):
|
||||
categories.add(getattr(matching_shouts[0], 'category', 'unknown'))
|
||||
if categories:
|
||||
logger.info(f"Search results cover categories: {', '.join(categories)}")
|
||||
else:
|
||||
logger.warning("Search verification returned no results. Index may be empty or not working.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying search index: {e}")
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue
Block a user