docs + admin-fix + search-linter-fixes
All checks were successful
Deploy on push / deploy (push) Successful in 6s

This commit is contained in:
2025-06-03 12:46:54 +03:00
parent 0375939e73
commit 6edc0ed3db
10 changed files with 542 additions and 70 deletions

View File

@@ -4,9 +4,9 @@ import logging
import os
import secrets
import time
from typing import Any, Optional
from typing import Any, Optional, cast
import httpx
from httpx import AsyncClient, Response
# Set up proper logging
logger = logging.getLogger("search")
@@ -46,8 +46,8 @@ class SearchCache:
"""Cache for search results to enable efficient pagination"""
def __init__(self, ttl_seconds: int = SEARCH_CACHE_TTL_SECONDS, max_items: int = 100) -> None:
self.cache = {} # Maps search query to list of results
self.last_accessed = {} # Maps search query to last access timestamp
self.cache: dict[str, list] = {} # Maps search query to list of results
self.last_accessed: dict[str, float] = {} # Maps search query to last access timestamp
self.ttl = ttl_seconds
self.max_items = max_items
self._redis_prefix = "search_cache:"
@@ -191,8 +191,8 @@ class SearchService:
logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}")
self.available = SEARCH_ENABLED
# Use different timeout settings for indexing and search requests
self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
self.client = AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.index_client = AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
# Initialize search cache
self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None
@@ -208,7 +208,7 @@ class SearchService:
if not self.available:
return {"status": "disabled"}
try:
response = await self.client.get("/info")
response: Response = await self.client.get("/info")
response.raise_for_status()
result = response.json()
logger.info(f"Search service info: {result}")
@@ -228,7 +228,7 @@ class SearchService:
try:
logger.info(f"Verifying {len(doc_ids)} documents in search index")
response = await self.client.post(
response: Response = await self.client.post(
"/verify-docs",
json={"doc_ids": doc_ids},
timeout=60.0, # Longer timeout for potentially large ID lists
@@ -358,10 +358,23 @@ class SearchService:
for i, response in enumerate(responses):
if isinstance(response, Exception):
logger.error(f"Error in indexing task {i}: {response}")
elif hasattr(response, "status_code") and response.status_code >= 400:
logger.error(
f"Error response in indexing task {i}: {response.status_code}, {await response.text()}"
)
elif hasattr(response, "status_code") and getattr(response, "status_code", 0) >= 400:
error_text = ""
if hasattr(response, "text") and isinstance(response.text, str):
error_text = response.text
elif hasattr(response, "text") and callable(response.text):
try:
# Получаем текст ответа, учитывая разные реализации Response
http_response = cast(Response, response)
# В некоторых версиях httpx, text - это свойство, а не метод
if callable(http_response.text):
error_text = await http_response.text()
else:
error_text = str(http_response.text)
except Exception as e:
error_text = f"[unable to get response text: {e}]"
logger.error(f"Error response in indexing task {i}: {response.status_code}, {error_text}")
logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints")
else:
@@ -556,7 +569,7 @@ class SearchService:
while not success and retry_count < max_retries:
try:
response = await self.index_client.post(endpoint, json=batch, timeout=90.0)
response: Response = await self.index_client.post(endpoint, json=batch, timeout=90.0)
if response.status_code == 422:
error_detail = response.json()
@@ -591,7 +604,7 @@ class SearchService:
)
break
wait_time = (2**retry_count) + (secrets.random() * 0.5)
wait_time = (2**retry_count) + (secrets.randbelow(500) / 1000)
await asyncio.sleep(wait_time)
def _truncate_error_detail(self, error_detail: Any) -> Any:
@@ -634,7 +647,7 @@ class SearchService:
return []
# Check if we can serve from cache
if SEARCH_CACHE_ENABLED:
if SEARCH_CACHE_ENABLED and self.cache is not None:
has_cache = await self.cache.has_query(text)
if has_cache:
cached_results = await self.cache.get(text, limit, offset)
@@ -648,7 +661,7 @@ class SearchService:
logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})")
response = await self.client.post(
response: Response = await self.client.post(
"/search-combined",
json={"text": text, "limit": search_limit},
)
@@ -664,10 +677,10 @@ class SearchService:
if len(valid_results) != len(formatted_results):
formatted_results = valid_results
if SEARCH_CACHE_ENABLED:
if SEARCH_CACHE_ENABLED and self.cache is not None:
# Store the full prefetch batch, then page it
await self.cache.store(text, formatted_results)
return await self.cache.get(text, limit, offset)
return await self.cache.get(text, limit, offset) or []
return formatted_results
except Exception:
@@ -682,7 +695,7 @@ class SearchService:
cache_key = f"author:{text}"
# Check if we can serve from cache
if SEARCH_CACHE_ENABLED:
if SEARCH_CACHE_ENABLED and self.cache is not None:
has_cache = await self.cache.has_query(cache_key)
if has_cache:
cached_results = await self.cache.get(cache_key, limit, offset)
@@ -696,7 +709,7 @@ class SearchService:
logger.info(
f"Searching authors for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})"
)
response = await self.client.post("/search-author", json={"text": text, "limit": search_limit})
response: Response = await self.client.post("/search-author", json={"text": text, "limit": search_limit})
response.raise_for_status()
result = response.json()
@@ -707,10 +720,10 @@ class SearchService:
if len(valid_results) != len(author_results):
author_results = valid_results
if SEARCH_CACHE_ENABLED:
if SEARCH_CACHE_ENABLED and self.cache is not None:
# Store the full prefetch batch, then page it
await self.cache.store(cache_key, author_results)
return await self.cache.get(cache_key, limit, offset)
return await self.cache.get(cache_key, limit, offset) or []
return author_results[offset : offset + limit]
@@ -724,7 +737,7 @@ class SearchService:
return {"status": "disabled"}
try:
response = await self.client.get("/index-status")
response: Response = await self.client.get("/index-status")
response.raise_for_status()
result = response.json()
@@ -738,6 +751,14 @@ class SearchService:
logger.exception("Failed to check index status")
return {"status": "error", "message": "Failed to check index status"}
async def close(self) -> None:
"""Close connections and release resources"""
if hasattr(self, "client") and self.client:
await self.client.aclose()
if hasattr(self, "index_client") and self.index_client:
await self.index_client.aclose()
logger.info("Search service closed")
# Create the search service singleton
search_service = SearchService()
@@ -764,7 +785,7 @@ async def get_search_count(text: str) -> int:
if not search_service.available:
return 0
if SEARCH_CACHE_ENABLED and await search_service.cache.has_query(text):
if SEARCH_CACHE_ENABLED and search_service.cache is not None and await search_service.cache.has_query(text):
return await search_service.cache.get_total_count(text)
# If not found in cache, fetch from endpoint
@@ -776,10 +797,9 @@ async def get_author_search_count(text: str) -> int:
if not search_service.available:
return 0
if SEARCH_CACHE_ENABLED:
cache_key = f"author:{text}"
if await search_service.cache.has_query(cache_key):
return await search_service.cache.get_total_count(cache_key)
cache_key = f"author:{text}"
if SEARCH_CACHE_ENABLED and search_service.cache is not None and await search_service.cache.has_query(cache_key):
return await search_service.cache.get_total_count(cache_key)
# If not found in cache, fetch from endpoint
return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0))