57 Commits

Author SHA1 Message Date
Stepan Vladovskiy
27b0928e73 feat: title weight procedure
All checks were successful
Deploy on push / deploy (push) Successful in 1m16s
2025-04-15 19:32:34 -03:00
Stepan Vladovskiy
e382cc1ea5 Merge branch 'dev' into feat/sv-searching-txtai
All checks were successful
Deploy on push / deploy (push) Successful in 6s
:
2025-04-15 19:20:48 -03:00
6920351b82 schema-fix
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-04-15 20:30:12 +03:00
eb216a5f36 draft-seo-handling
All checks were successful
Deploy on push / deploy (push) Successful in 1m10s
2025-04-15 20:16:01 +03:00
bd129efde6 update-seo-handling 2025-04-15 20:14:42 +03:00
b9f6033e66 generate seo text when draft created 2025-04-15 20:09:22 +03:00
710f522c8f schema-upgrade
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-14 19:53:14 +03:00
0de4404cb1 draft-community
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-14 16:02:19 +03:00
to
83d61ca76d Merge branch 'dev' into feat/sv-searching-txtai
All checks were successful
Deploy on push / deploy (push) Successful in 6s
2025-04-13 05:36:18 +00:00
1c61e889d6 update-draft-fix
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-10 22:51:07 +03:00
fdedb75a2c topics-comments-stat
All checks were successful
Deploy on push / deploy (push) Successful in 45s
2025-04-10 19:14:27 +03:00
f20000f1f6 topic.stat.authors-fix2
All checks were successful
Deploy on push / deploy (push) Successful in 45s
2025-04-10 18:46:09 +03:00
7d50638b3a topic.stat.authors-fix
All checks were successful
Deploy on push / deploy (push) Successful in 1m20s
2025-04-10 18:39:31 +03:00
Stepan Vladovskiy
106222b0e0 debug: without debug logging. clean
All checks were successful
Deploy on push / deploy (push) Successful in 1m27s
2025-04-07 11:41:48 -03:00
Stepan Vladovskiy
c533241d1e fix(reader): sorting by rang not by id in cash
All checks were successful
Deploy on push / deploy (push) Successful in 6s
2025-04-03 13:51:13 -03:00
Stepan Vladovskiy
78326047bf fix(reader.py): change sorting and answer on querys
All checks were successful
Deploy on push / deploy (push) Successful in 50s
2025-04-03 13:20:18 -03:00
Stepan Vladovskiy
bc4ec79240 fix(search.py): store all results in cash not only first offset
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-04-03 13:10:53 -03:00
Stepan Vladovskiy
a0db5707c4 feat: add cash for storing searchresalts and hold them for working pagination. Now we are have offset for use on frontend
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-04-01 16:01:09 -03:00
Stepan Vladovskiy
ecc443c3ad refactor(reader.py): Remove the unnecessary topic joins that cause duplicate results
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-04-01 12:57:46 -03:00
Stepan Vladovskiy
9a02ca74ad merged with dev
All checks were successful
Deploy on push / deploy (push) Successful in 1m24s
2025-03-31 13:38:32 -03:00
Stepan Vladovskiy
9ebb81cbd3 refactor(reader.py): rm debug line 2025-03-31 13:32:51 -03:00
abbc074474 updateby-fix
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-31 14:39:02 +03:00
Stepan Vladovskiy
0bc55977ac debug(reader.py): query_with_stat(info) always
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-27 15:18:08 -03:00
Stepan Vladovskiy
ff3a4debce debug(reader.py): trying to handle main topic ids founded
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-27 14:43:17 -03:00
Stepan Vladovskiy
ae85b32f69 feat(type.qraphql): SearchResult with shout id
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-27 14:06:52 -03:00
Stepan Vladovskiy
34a354e9e3 debug(reader.py: trying back shout id in query call
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-03-27 11:54:56 -03:00
4f599e097f [0.4.17] - 2025-03-26
All checks were successful
Deploy on push / deploy (push) Successful in 54s
- Fixed `'Reaction' object is not subscriptable` error in hierarchical comments:
  - Modified `get_reactions_with_stat()` to convert Reaction objects to dictionaries
  - Added default values for limit/offset parameters
  - Fixed `load_first_replies()` implementation with proper parameter passing
  - Added doctest with example usage
  - Limited child comments to 100 per parent for performance
2025-03-26 08:54:10 +03:00
a5eaf4bb65 commented->comments_count
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-26 08:25:18 +03:00
Stepan Vladovskiy
e405fb527b refactor(search.py): moved to use one table docs for embdings and docs store
All checks were successful
Deploy on push / deploy (push) Successful in 50s
2025-03-25 16:42:44 -03:00
Stepan Vladovskiy
7f36f93d92 feat(search.py): detects both missing documents and null embeddings
All checks were successful
Deploy on push / deploy (push) Successful in 1m32s
2025-03-25 15:18:29 -03:00
Stepan Vladovskiy
f089a32394 debug(search.py): with more logs when check sync of indexing
All checks were successful
Deploy on push / deploy (push) Successful in 1m3s
2025-03-25 14:44:05 -03:00
Stepan Vladovskiy
1fd623a660 feat: with index sync endpoints configs
All checks were successful
Deploy on push / deploy (push) Successful in 56s
2025-03-25 13:31:45 -03:00
Stepan Vladovskiy
88012f1b8c debug(server.py): with 4 workers (threds). cheking reindexing
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-25 12:21:59 -03:00
Stepan Vladovskiy
6e284640c0 feat: give little timeout for resource stab
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-24 21:42:51 -03:00
Stepan Vladovskiy
077cb46482 debug: server.py -> threds 1 , search.py -> add 3 times reconect
All checks were successful
Deploy on push / deploy (push) Successful in 49s
2025-03-24 20:16:07 -03:00
Stepan Vladovskiy
60a13a9097 refactor(search.py): moved initialization logic in search-txtai instance
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-24 19:47:02 -03:00
3c56fdfaea get_topics_paginated-fix
All checks were successful
Deploy on push / deploy (push) Successful in 56s
2025-03-22 18:49:15 +03:00
81a8bf3c58 query-type-fix
All checks were successful
Deploy on push / deploy (push) Successful in 49s
2025-03-22 18:44:31 +03:00
Stepan Vladovskiy
316375bf18 debug(search.py): encrease batch size for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 1m1s
2025-03-21 17:56:54 -03:00
Stepan Vladovskiy
fb820f67fd debug(search.py): encrease batch size for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 17:48:26 -03:00
Stepan Vladovskiy
f1d9f4e036 feat(search.py): with db reset endpoint
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 17:28:54 -03:00
Stepan Vladovskiy
ebb67eb311 debug: decrease chars in search.py for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-03-21 16:53:00 -03:00
Stepan Vladovskiy
50a8c24ead feat(search.py): documnet for bulk indexing are categorized
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-21 15:40:29 -03:00
Stepan Vladovskiy
eb4b9363ab debug: change logs entris and indexing not wraps all in documents
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:32:45 -03:00
Stepan Vladovskiy
19c5028a0c debug: Limit max chars for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:18:32 -03:00
Stepan Vladovskiy
57e1e8e6bd debug: more logs in indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:10:09 -03:00
Stepan Vladovskiy
385057ffcd debug: with logs in indexing procedure
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-21 13:45:50 -03:00
Stepan Vladovskiy
90699768ff debug: start index
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-21 13:30:23 -03:00
Stepan Vladovskiy
ad0ca75aa9 debug: no redis for indexing in nackend side
All checks were successful
Deploy on push / deploy (push) Successful in 1m41s
2025-03-19 14:47:31 -03:00
Stepan Vladovskiy
39242d5e6c debug: add logs in search.py and change and input validation ... index ver too
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-12 14:13:55 -03:00
Stepan Vladovskiy
24cca7f2cb debug: something wrong one stap back with logs
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-12 13:11:19 -03:00
Stepan Vladovskiy
a9c7ac49d6 feat: with logs >>>
All checks were successful
Deploy on push / deploy (push) Successful in 59s
2025-03-12 13:07:27 -03:00
Stepan Vladovskiy
f249752db5 feat: moved txtai and search procedure in different instance
All checks were successful
Deploy on push / deploy (push) Successful in 2m18s
2025-03-12 12:06:09 -03:00
Stepan Vladovskiy
c0b2116da2 feat(db.py): added fetch_all_shouts, to populate the search index
All checks were successful
Deploy on push / deploy (push) Successful in 35s
2025-03-05 20:32:34 +00:00
Stepan Vladovskiy
59e71c8144 debug: fixed workflows gitea
All checks were successful
Deploy on push / deploy (push) Successful in 4m41s
2025-03-05 20:17:34 +00:00
Stepan Vladovskiy
e6a416383d debug: fixed workflows gitea
All checks were successful
Deploy on push / deploy (push) Successful in 15s
2025-03-05 20:16:32 +00:00
Stepan Vladovskiy
d55448398d feat(search.py): change to txtai server, with ai model. And fix granian workers 2025-03-05 20:08:21 +00:00
23 changed files with 1038 additions and 333 deletions

View File

@@ -29,7 +29,16 @@ jobs:
if: github.ref == 'refs/heads/dev' if: github.ref == 'refs/heads/dev'
uses: dokku/github-action@master uses: dokku/github-action@master
with: with:
branch: 'dev' branch: 'main'
force: true force: true
git_remote_url: 'ssh://dokku@v2.discours.io:22/core' git_remote_url: 'ssh://dokku@v2.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }} ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Push to dokku for staging branch
if: github.ref == 'refs/heads/staging'
uses: dokku/github-action@master
with:
branch: 'dev'
git_remote_url: 'ssh://dokku@staging.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
git_push_flags: '--force'

4
.gitignore vendored
View File

@@ -128,6 +128,9 @@ dmypy.json
.idea .idea
temp.* temp.*
# Debug
DEBUG.log
discours.key discours.key
discours.crt discours.crt
discours.pem discours.pem
@@ -162,3 +165,4 @@ views.json
*.crt *.crt
*cache.json *cache.json
.cursor .cursor
.devcontainer/

View File

@@ -1,9 +1,29 @@
#### [0.4.19] - 2025-04-14
- dropped `Shout.description` and `Draft.description` to be UX-generated
- use redis to init views counters after migrator
#### [0.4.18] - 2025-04-10
- Fixed `Topic.stat.authors` and `Topic.stat.comments`
- Fixed unique constraint violation for empty slug values:
- Modified `update_draft` resolver to handle empty slug values
- Modified `create_draft` resolver to prevent empty slug values
- Added validation to prevent inserting or updating drafts with empty slug
- Fixed database error "duplicate key value violates unique constraint draft_slug_key"
#### [0.4.17] - 2025-03-26
- Fixed `'Reaction' object is not subscriptable` error in hierarchical comments:
- Modified `get_reactions_with_stat()` to convert Reaction objects to dictionaries
- Added default values for limit/offset parameters
- Fixed `load_first_replies()` implementation with proper parameter passing
- Added doctest with example usage
- Limited child comments to 100 per parent for performance
#### [0.4.16] - 2025-03-22 #### [0.4.16] - 2025-03-22
- Added hierarchical comments pagination: - Added hierarchical comments pagination:
- Created new GraphQL query `load_comments_branch` for efficient loading of hierarchical comments - Created new GraphQL query `load_comments_branch` for efficient loading of hierarchical comments
- Ability to load root comments with their first N replies - Ability to load root comments with their first N replies
- Added pagination for both root and child comments - Added pagination for both root and child comments
- Using existing `commented` field in `Stat` type to display number of replies - Using existing `comments_count` field in `Stat` type to display number of replies
- Added special `first_replies` field to store first replies to a comment - Added special `first_replies` field to store first replies to a comment
- Optimized SQL queries for efficient loading of comment hierarchies - Optimized SQL queries for efficient loading of comment hierarchies
- Implemented flexible comment sorting system (by time, rating) - Implemented flexible comment sorting system (by time, rating)
@@ -41,8 +61,7 @@
- Implemented persistent Redis caching for author queries without TTL (invalidated only on changes) - Implemented persistent Redis caching for author queries without TTL (invalidated only on changes)
- Optimized author retrieval with separate endpoints: - Optimized author retrieval with separate endpoints:
- `get_authors_all` - returns all non-deleted authors without statistics - `get_authors_all` - returns all non-deleted authors without statistics
- `get_authors_paginated` - returns authors with statistics and pagination support - `load_authors_by` - optimized to use caching and efficient sorting and pagination
- `load_authors_by` - optimized to use caching and efficient sorting
- Improved SQL queries with optimized JOIN conditions and efficient filtering - Improved SQL queries with optimized JOIN conditions and efficient filtering
- Added pre-aggregation of statistics (shouts count, followers count) in single efficient queries - Added pre-aggregation of statistics (shouts count, followers count) in single efficient queries
- Implemented robust cache invalidation on author updates - Implemented robust cache invalidation on author updates
@@ -54,7 +73,6 @@
- Implemented persistent Redis caching for topic queries (no TTL, invalidated only on changes) - Implemented persistent Redis caching for topic queries (no TTL, invalidated only on changes)
- Optimized topic retrieval with separate endpoints for different use cases: - Optimized topic retrieval with separate endpoints for different use cases:
- `get_topics_all` - returns all topics without statistics for lightweight listing - `get_topics_all` - returns all topics without statistics for lightweight listing
- `get_topics_paginated` - returns topics with statistics and pagination support
- `get_topics_by_community` - adds pagination and optimized filtering by community - `get_topics_by_community` - adds pagination and optimized filtering by community
- Added SQLAlchemy-managed indexes directly in ORM models for automatic schema maintenance - Added SQLAlchemy-managed indexes directly in ORM models for automatic schema maintenance
- Created `sync_indexes()` function for automatic index synchronization during app startup - Created `sync_indexes()` function for automatic index synchronization during app startup
@@ -152,7 +170,7 @@
#### [0.4.4] #### [0.4.4]
- `followers_stat` removed for shout - `followers_stat` removed for shout
- sqlite3 support added - sqlite3 support added
- `rating_stat` and `commented_stat` fixes - `rating_stat` and `comments_count` fixes
#### [0.4.3] #### [0.4.3]
- cache reimplemented - cache reimplemented

5
cache/cache.py vendored
View File

@@ -545,8 +545,9 @@ async def get_cached_data(key: str) -> Optional[Any]:
try: try:
cached_data = await redis.execute("GET", key) cached_data = await redis.execute("GET", key)
if cached_data: if cached_data:
logger.debug(f"Данные получены из кеша по ключу {key}") loaded = orjson.loads(cached_data)
return orjson.loads(cached_data) logger.debug(f"Данные получены из кеша по ключу {key}: {len(loaded)}")
return loaded
return None return None
except Exception as e: except Exception as e:
logger.error(f"Ошибка при получении данных из кеша: {e}") logger.error(f"Ошибка при получении данных из кеша: {e}")

View File

@@ -45,7 +45,7 @@ query LoadCommentsBranch(
reply_to reply_to
stat { stat {
rating rating
commented comments_count
} }
first_replies { first_replies {
id id
@@ -61,7 +61,7 @@ query LoadCommentsBranch(
reply_to reply_to
stat { stat {
rating rating
commented comments_count
} }
} }
} }
@@ -92,7 +92,7 @@ query LoadCommentsBranch(
- `reply_to`: ID родительского комментария (null для корневых) - `reply_to`: ID родительского комментария (null для корневых)
- `first_replies`: Первые N дочерних комментариев - `first_replies`: Первые N дочерних комментариев
- `stat`: Статистика комментария, включающая: - `stat`: Статистика комментария, включающая:
- `commented`: Количество ответов на комментарий - `comments_count`: Количество ответов на комментарий
- `rating`: Рейтинг комментария - `rating`: Рейтинг комментария
## Примеры использования ## Примеры использования
@@ -150,7 +150,7 @@ const { data } = await client.query({
1. Для эффективной работы со сложными ветками обсуждений рекомендуется: 1. Для эффективной работы со сложными ветками обсуждений рекомендуется:
- Сначала загружать только корневые комментарии с первыми N ответами - Сначала загружать только корневые комментарии с первыми N ответами
- При наличии дополнительных ответов (когда `stat.commented > first_replies.length`) - При наличии дополнительных ответов (когда `stat.comments_count > first_replies.length`)
добавить кнопку "Показать все ответы" добавить кнопку "Показать все ответы"
- При нажатии на кнопку загружать дополнительные ответы с помощью запроса с указанным `parentId` - При нажатии на кнопку загружать дополнительные ответы с помощью запроса с указанным `parentId`

View File

@@ -42,7 +42,7 @@
- Отдельный запрос `load_comments_branch` для оптимизированной загрузки ветки комментариев - Отдельный запрос `load_comments_branch` для оптимизированной загрузки ветки комментариев
- Возможность загрузки корневых комментариев статьи с первыми ответами на них - Возможность загрузки корневых комментариев статьи с первыми ответами на них
- Гибкая пагинация как для корневых, так и для дочерних комментариев - Гибкая пагинация как для корневых, так и для дочерних комментариев
- Использование поля `stat.commented` для отображения количества ответов на комментарий - Использование поля `stat.comments_count` для отображения количества ответов на комментарий
- Добавление специального поля `first_replies` для хранения первых ответов на комментарий - Добавление специального поля `first_replies` для хранения первых ответов на комментарий
- Поддержка различных методов сортировки (новые, старые, популярные) - Поддержка различных методов сортировки (новые, старые, популярные)
- Оптимизированные SQL запросы для минимизации нагрузки на базу данных - Оптимизированные SQL запросы для минимизации нагрузки на базу данных

48
main.py
View File

@@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager
from services.exception import ExceptionHandlerMiddleware from services.exception import ExceptionHandlerMiddleware
from services.redis import redis from services.redis import redis
from services.schema import create_all_tables, resolvers from services.schema import create_all_tables, resolvers
from services.search import search_service #from services.search import search_service
from services.search import search_service, initialize_search_index
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint, create_webhook_endpoint from services.webhook import WebhookEndpoint, create_webhook_endpoint
from settings import DEV_SERVER_PID_FILE_NAME, MODE from settings import DEV_SERVER_PID_FILE_NAME, MODE
@@ -34,24 +35,67 @@ async def start():
f.write(str(os.getpid())) f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode") print(f"[main] process started in {MODE} mode")
async def check_search_service():
"""Check if search service is available and log result"""
info = await search_service.info()
if info.get("status") in ["error", "unavailable"]:
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
else:
print(f"[INFO] Search service is available: {info}")
# indexing DB data
# async def indexing():
# from services.db import fetch_all_shouts
# all_shouts = await fetch_all_shouts()
# await initialize_search_index(all_shouts)
async def lifespan(_app): async def lifespan(_app):
try: try:
print("[lifespan] Starting application initialization")
create_all_tables() create_all_tables()
await asyncio.gather( await asyncio.gather(
redis.connect(), redis.connect(),
precache_data(), precache_data(),
ViewedStorage.init(), ViewedStorage.init(),
create_webhook_endpoint(), create_webhook_endpoint(),
search_service.info(), check_search_service(),
start(), start(),
revalidation_manager.start(), revalidation_manager.start(),
) )
print("[lifespan] Basic initialization complete")
# Add a delay before starting the intensive search indexing
print("[lifespan] Waiting for system stabilization before search indexing...")
await asyncio.sleep(10) # 10-second delay to let the system stabilize
# Start search indexing as a background task with lower priority
asyncio.create_task(initialize_search_index_background())
yield yield
finally: finally:
print("[lifespan] Shutting down application services")
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()] tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
print("[lifespan] Shutdown complete")
# Initialize search index in the background
async def initialize_search_index_background():
"""Run search indexing as a background task with low priority"""
try:
print("[search] Starting background search indexing process")
from services.db import fetch_all_shouts
# Get total count first (optional)
all_shouts = await fetch_all_shouts()
total_count = len(all_shouts) if all_shouts else 0
print(f"[search] Fetched {total_count} shouts for background indexing")
# Start the indexing process with the fetched shouts
print("[search] Beginning background search index initialization...")
await initialize_search_index(all_shouts)
print("[search] Background search index initialization complete")
except Exception as e:
print(f"[search] Error in background search indexing: {str(e)}")
# Создаем экземпляр GraphQL # Создаем экземпляр GraphQL
graphql_app = GraphQL(schema, debug=True) graphql_app = GraphQL(schema, debug=True)

View File

@@ -31,6 +31,7 @@ class Draft(Base):
# required # required
created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time())) created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time()))
created_by: int = Column(ForeignKey("author.id"), nullable=False) created_by: int = Column(ForeignKey("author.id"), nullable=False)
community: int = Column(ForeignKey("community.id"), nullable=False, default=1)
# optional # optional
layout: str = Column(String, nullable=True, default="article") layout: str = Column(String, nullable=True, default="article")
@@ -38,7 +39,6 @@ class Draft(Base):
title: str = Column(String, nullable=True) title: str = Column(String, nullable=True)
subtitle: str | None = Column(String, nullable=True) subtitle: str | None = Column(String, nullable=True)
lead: str | None = Column(String, nullable=True) lead: str | None = Column(String, nullable=True)
description: str | None = Column(String, nullable=True)
body: str = Column(String, nullable=False, comment="Body") body: str = Column(String, nullable=False, comment="Body")
media: dict | None = Column(JSON, nullable=True) media: dict | None = Column(JSON, nullable=True)
cover: str | None = Column(String, nullable=True, comment="Cover image url") cover: str | None = Column(String, nullable=True, comment="Cover image url")

View File

@@ -91,7 +91,6 @@ class Shout(Base):
cover: str | None = Column(String, nullable=True, comment="Cover image url") cover: str | None = Column(String, nullable=True, comment="Cover image url")
cover_caption: str | None = Column(String, nullable=True, comment="Cover image alt caption") cover_caption: str | None = Column(String, nullable=True, comment="Cover image alt caption")
lead: str | None = Column(String, nullable=True) lead: str | None = Column(String, nullable=True)
description: str | None = Column(String, nullable=True)
title: str = Column(String, nullable=False) title: str = Column(String, nullable=False)
subtitle: str | None = Column(String, nullable=True) subtitle: str | None = Column(String, nullable=True)
layout: str = Column(String, nullable=False, default="article") layout: str = Column(String, nullable=False, default="article")

View File

@@ -13,5 +13,10 @@ starlette
gql gql
ariadne ariadne
granian granian
# NLP and search
httpx
orjson orjson
pydantic pydantic
trafilatura

View File

@@ -232,22 +232,6 @@ async def get_authors_all(_, _info):
return await get_all_authors() return await get_all_authors()
@query.field("get_authors_paginated")
async def get_authors_paginated(_, _info, limit=50, offset=0, by=None):
"""
Получает список авторов с пагинацией и статистикой.
Args:
limit: Максимальное количество возвращаемых авторов
offset: Смещение для пагинации
by: Параметр сортировки (new/active)
Returns:
list: Список авторов с их статистикой
"""
return await get_authors_with_stats(limit, offset, by)
@query.field("get_author") @query.field("get_author")
async def get_author(_, _info, slug="", author_id=0): async def get_author(_, _info, slug="", author_id=0):
author_dict = None author_dict = None

View File

@@ -1,6 +1,7 @@
import time import time
from operator import or_ from operator import or_
import trafilatura
from sqlalchemy.sql import and_ from sqlalchemy.sql import and_
from cache.cache import ( from cache.cache import (
@@ -30,7 +31,6 @@ def create_shout_from_draft(session, draft, author_id):
cover=draft.cover, cover=draft.cover,
cover_caption=draft.cover_caption, cover_caption=draft.cover_caption,
lead=draft.lead, lead=draft.lead,
description=draft.description,
title=draft.title, title=draft.title,
subtitle=draft.subtitle, subtitle=draft.subtitle,
layout=draft.layout, layout=draft.layout,
@@ -105,12 +105,21 @@ async def create_draft(_, info, draft_input):
if "title" not in draft_input or not draft_input["title"]: if "title" not in draft_input or not draft_input["title"]:
draft_input["title"] = "" # Пустая строка вместо NULL draft_input["title"] = "" # Пустая строка вместо NULL
# Проверяем slug - он должен быть или не пустым, или не передаваться вообще
if "slug" in draft_input and (draft_input["slug"] is None or draft_input["slug"] == ""):
# При создании черновика удаляем пустой slug из входных данных
del draft_input["slug"]
try: try:
with local_session() as session: with local_session() as session:
# Remove id from input if present since it's auto-generated # Remove id from input if present since it's auto-generated
if "id" in draft_input: if "id" in draft_input:
del draft_input["id"] del draft_input["id"]
if "seo" not in draft_input and not draft_input["seo"]:
body_teaser = draft_input.get("body", "")[:300].split("\n")[:-1].join("\n")
draft_input["seo"] = draft_input.get("lead", body_teaser)
# Добавляем текущее время создания # Добавляем текущее время создания
draft_input["created_at"] = int(time.time()) draft_input["created_at"] = int(time.time())
@@ -142,13 +151,34 @@ async def update_draft(_, info, draft_id: int, draft_input):
if not user_id or not author_id: if not user_id or not author_id:
return {"error": "Author ID are required"} return {"error": "Author ID are required"}
# Проверяем slug - он должен быть или не пустым, или не передаваться вообще
if "slug" in draft_input and (draft_input["slug"] is None or draft_input["slug"] == ""):
# Если slug пустой, либо удаляем его из входных данных, либо генерируем временный уникальный
# Вариант 1: просто удаляем ключ из входных данных, чтобы оставить старое значение
del draft_input["slug"]
# Вариант 2 (если нужно обновить): генерируем временный уникальный slug
# import uuid
# draft_input["slug"] = f"draft-{uuid.uuid4().hex[:8]}"
with local_session() as session: with local_session() as session:
draft = session.query(Draft).filter(Draft.id == draft_id).first() draft = session.query(Draft).filter(Draft.id == draft_id).first()
if not draft: if not draft:
return {"error": "Draft not found"} return {"error": "Draft not found"}
if "seo" not in draft_input and not draft.seo:
body_src = draft_input["body"] if "body" in draft_input else draft.body
body_text = trafilatura.extract(body_src)
lead_src = draft_input["lead"] if "lead" in draft_input else draft.lead
lead_text = trafilatura.extract(lead_src)
body_teaser = body_text[:300].split(". ")[:-1].join(".\n")
draft_input["seo"] = lead_text or body_teaser
Draft.update(draft, draft_input) Draft.update(draft, draft_input)
draft.updated_at = int(time.time()) # Set updated_at and updated_by from the authenticated user
current_time = int(time.time())
draft.updated_at = current_time
draft.updated_by = author_id
session.commit() session.commit()
return {"draft": draft} return {"draft": draft}
@@ -249,7 +279,6 @@ async def publish_shout(_, info, shout_id: int):
shout.cover = draft.cover shout.cover = draft.cover
shout.cover_caption = draft.cover_caption shout.cover_caption = draft.cover_caption
shout.lead = draft.lead shout.lead = draft.lead
shout.description = draft.description
shout.layout = draft.layout shout.layout = draft.layout
shout.media = draft.media shout.media = draft.media
shout.lang = draft.lang shout.lang = draft.lang

View File

@@ -1,6 +1,7 @@
import time import time
import orjson import orjson
import trafilatura
from sqlalchemy import and_, desc, select from sqlalchemy import and_, desc, select
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from sqlalchemy.sql.functions import coalesce from sqlalchemy.sql.functions import coalesce
@@ -176,9 +177,16 @@ async def create_shout(_, info, inp):
logger.info(f"Creating shout with input: {inp}") logger.info(f"Creating shout with input: {inp}")
# Создаем публикацию без topics # Создаем публикацию без topics
body = inp.get("body", "")
lead = inp.get("lead", "")
body_text = trafilatura.extract(body)
lead_text = trafilatura.extract(lead)
seo = inp.get("seo", lead_text or body_text[:300].split(". ")[:-1].join(". "))
new_shout = Shout( new_shout = Shout(
slug=slug, slug=slug,
body=inp.get("body", ""), body=body,
seo=seo,
lead=lead,
layout=inp.get("layout", "article"), layout=inp.get("layout", "article"),
title=inp.get("title", ""), title=inp.get("title", ""),
created_by=author_id, created_by=author_id,
@@ -380,7 +388,7 @@ def patch_topics(session, shout, topics_input):
# @login_required # @login_required
async def update_shout(_, info, shout_id: int, shout_input=None, publish=False): async def update_shout(_, info, shout_id: int, shout_input=None, publish=False):
logger.info(f"Starting update_shout with id={shout_id}, publish={publish}") logger.info(f"Starting update_shout with id={shout_id}, publish={publish}")
logger.debug(f"Full shout_input: {shout_input}") logger.debug(f"Full shout_input: {shout_input}") # DraftInput
user_id = info.context.get("user_id") user_id = info.context.get("user_id")
roles = info.context.get("roles", []) roles = info.context.get("roles", [])

View File

@@ -67,30 +67,35 @@ def add_reaction_stat_columns(q):
return q return q
def get_reactions_with_stat(q, limit, offset): def get_reactions_with_stat(q, limit=10, offset=0):
""" """
Execute the reaction query and retrieve reactions with statistics. Execute the reaction query and retrieve reactions with statistics.
:param q: Query with reactions and statistics. :param q: Query with reactions and statistics.
:param limit: Number of reactions to load. :param limit: Number of reactions to load.
:param offset: Pagination offset. :param offset: Pagination offset.
:return: List of reactions. :return: List of reactions as dictionaries.
>>> get_reactions_with_stat(q, 10, 0) # doctest: +SKIP
[{'id': 1, 'body': 'Текст комментария', 'stat': {'rating': 5, 'comments_count': 3}, ...}]
""" """
q = q.limit(limit).offset(offset) q = q.limit(limit).offset(offset)
reactions = [] reactions = []
with local_session() as session: with local_session() as session:
result_rows = session.execute(q) result_rows = session.execute(q)
for reaction, author, shout, commented_stat, rating_stat in result_rows: for reaction, author, shout, comments_count, rating_stat in result_rows:
# Пропускаем реакции с отсутствующими shout или author # Пропускаем реакции с отсутствующими shout или author
if not shout or not author: if not shout or not author:
logger.error(f"Пропущена реакция из-за отсутствия shout или author: {reaction.dict()}") logger.error(f"Пропущена реакция из-за отсутствия shout или author: {reaction.dict()}")
continue continue
reaction.created_by = author.dict() # Преобразуем Reaction в словарь для доступа по ключу
reaction.shout = shout.dict() reaction_dict = reaction.dict()
reaction.stat = {"rating": rating_stat, "comments": commented_stat} reaction_dict["created_by"] = author.dict()
reactions.append(reaction) reaction_dict["shout"] = shout.dict()
reaction_dict["stat"] = {"rating": rating_stat, "comments_count": comments_count}
reactions.append(reaction_dict)
return reactions return reactions
@@ -393,7 +398,7 @@ async def update_reaction(_, info, reaction):
result = session.execute(reaction_query).unique().first() result = session.execute(reaction_query).unique().first()
if result: if result:
r, author, _shout, commented_stat, rating_stat = result r, author, _shout, comments_count, rating_stat = result
if not r or not author: if not r or not author:
return {"error": "Invalid reaction ID or unauthorized"} return {"error": "Invalid reaction ID or unauthorized"}
@@ -408,7 +413,7 @@ async def update_reaction(_, info, reaction):
session.commit() session.commit()
r.stat = { r.stat = {
"commented": commented_stat, "comments_count": comments_count,
"rating": rating_stat, "rating": rating_stat,
} }
@@ -713,7 +718,7 @@ async def load_comments_branch(
async def load_replies_count(comments): async def load_replies_count(comments):
""" """
Загружает количество ответов для списка комментариев и обновляет поле stat.commented. Загружает количество ответов для списка комментариев и обновляет поле stat.comments_count.
:param comments: Список комментариев, для которых нужно загрузить количество ответов. :param comments: Список комментариев, для которых нужно загрузить количество ответов.
""" """
@@ -748,7 +753,7 @@ async def load_replies_count(comments):
comment["stat"] = {} comment["stat"] = {}
# Обновляем счетчик комментариев в stat # Обновляем счетчик комментариев в stat
comment["stat"]["commented"] = replies_count.get(comment["id"], 0) comment["stat"]["comments_count"] = replies_count.get(comment["id"], 0)
async def load_first_replies(comments, limit, offset, sort="newest"): async def load_first_replies(comments, limit, offset, sort="newest"):
@@ -793,8 +798,9 @@ async def load_first_replies(comments, limit, offset, sort="newest"):
q = q.order_by(order_by_stmt, Reaction.reply_to) q = q.order_by(order_by_stmt, Reaction.reply_to)
# Выполняем запрос # Выполняем запрос - указываем limit для неограниченного количества ответов
replies = get_reactions_with_stat(q) # но не более 100 на родительский комментарий
replies = get_reactions_with_stat(q, limit=100, offset=0)
# Группируем ответы по родительским ID # Группируем ответы по родительским ID
replies_by_parent = {} replies_by_parent = {}

View File

@@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from services.db import json_array_builder, json_builder, local_session from services.db import json_array_builder, json_builder, local_session
from services.schema import query from services.schema import query
from services.search import search_text from services.search import search_text, get_search_count
from services.viewed import ViewedStorage from services.viewed import ViewedStorage
from utils.logger import root_logger as logger from utils.logger import root_logger as logger
@@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
""" """
shouts = [] shouts = []
try: try:
# logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}")
q = q.limit(limit).offset(offset) q = q.limit(limit).offset(offset)
with local_session() as session: with local_session() as session:
shouts_result = session.execute(q).all() shouts_result = session.execute(q).all()
# logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query")
if not shouts_result: if not shouts_result:
logger.warning("No shouts found in query result") logger.warning("No shouts found in query result")
@@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
shout = None shout = None
if hasattr(row, "Shout"): if hasattr(row, "Shout"):
shout = row.Shout shout = row.Shout
# logger.debug(f"Processing shout#{shout.id} at index {idx}")
if shout: if shout:
shout_id = int(f"{shout.id}") shout_id = int(f"{shout.id}")
shout_dict = shout.dict() shout_dict = shout.dict()
@@ -225,26 +222,22 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
elif isinstance(row.stat, dict): elif isinstance(row.stat, dict):
stat = row.stat stat = row.stat
viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0 viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0
shout_dict["stat"] = {**stat, "viewed": viewed, "commented": stat.get("comments_count", 0)} shout_dict["stat"] = {**stat, "viewed": viewed}
# Обработка main_topic и topics # Обработка main_topic и topics
topics = None topics = None
if has_field(info, "topics") and hasattr(row, "topics"): if has_field(info, "topics") and hasattr(row, "topics"):
topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
# logger.debug(f"Shout#{shout_id} topics: {topics}")
shout_dict["topics"] = topics shout_dict["topics"] = topics
if has_field(info, "main_topic"): if has_field(info, "main_topic"):
main_topic = None main_topic = None
if hasattr(row, "main_topic"): if hasattr(row, "main_topic"):
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
main_topic = ( main_topic = (
orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
) )
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
if not main_topic and topics and len(topics) > 0: if not main_topic and topics and len(topics) > 0:
# logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list")
main_topic = { main_topic = {
"id": topics[0]["id"], "id": topics[0]["id"],
"title": topics[0]["title"], "title": topics[0]["title"],
@@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
"is_main": True, "is_main": True,
} }
elif not main_topic: elif not main_topic:
logger.warning(f"No main_topic and no topics found for shout#{shout_id}")
main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True} main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
shout_dict["main_topic"] = main_topic shout_dict["main_topic"] = main_topic
# logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}")
if has_field(info, "authors") and hasattr(row, "authors"): if has_field(info, "authors") and hasattr(row, "authors"):
shout_dict["authors"] = ( shout_dict["authors"] = (
@@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True) logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True)
raise raise
finally: finally:
logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links")
return shouts return shouts
@@ -401,8 +391,17 @@ async def load_shouts_search(_, info, text, options):
""" """
limit = options.get("limit", 10) limit = options.get("limit", 10)
offset = options.get("offset", 0) offset = options.get("offset", 0)
if isinstance(text, str) and len(text) > 2: if isinstance(text, str) and len(text) > 2:
# Get search results with pagination
results = await search_text(text, limit, offset) results = await search_text(text, limit, offset)
# If no results, return empty list
if not results:
logger.info(f"No search results found for '{text}'")
return []
# Extract IDs and scores
scores = {} scores = {}
hits_ids = [] hits_ids = []
for sr in results: for sr in results:
@@ -412,22 +411,42 @@ async def load_shouts_search(_, info, text, options):
scores[shout_id] = sr.get("score") scores[shout_id] = sr.get("score")
hits_ids.append(shout_id) hits_ids.append(shout_id)
q = ( # Query DB for only the IDs in the current page
query_with_stat(info) q = query_with_stat(info)
if has_field(info, "stat")
else select(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
)
q = q.filter(Shout.id.in_(hits_ids)) q = q.filter(Shout.id.in_(hits_ids))
q = apply_filters(q, options) q = apply_filters(q, options.get("filters", {}))
q = apply_sorting(q, options)
shouts = get_shouts_with_links(info, q, limit, offset) #
shouts = get_shouts_with_links(info, q, len(hits_ids), 0)
# Add scores from search results
for shout in shouts: for shout in shouts:
shout.score = scores[f"{shout.id}"] shout_id = str(shout['id'])
shouts.sort(key=lambda x: x.score, reverse=True) shout["score"] = scores.get(shout_id, 0)
# Re-sort by search score to maintain ranking
shouts.sort(key=lambda x: scores.get(str(x['id']), 0), reverse=True)
return shouts return shouts
return [] return []
@query.field("get_search_results_count")
async def get_search_results_count(_, info, text):
"""
Returns the total count of search results for a search query.
:param _: Root query object (unused)
:param info: GraphQL context information
:param text: Search query text
:return: Total count of results
"""
if isinstance(text, str) and len(text) > 2:
count = await get_search_count(text)
return {"count": count}
return {"count": 0}
@query.field("load_shouts_unrated") @query.field("load_shouts_unrated")
async def load_shouts_unrated(_, info, options): async def load_shouts_unrated(_, info, options):
""" """

View File

@@ -127,6 +127,28 @@ async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None)
""" """
followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))} followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))}
# Запрос на получение статистики авторов для выбранных тем
authors_stats_query = f"""
SELECT st.topic, COUNT(DISTINCT sa.author) as authors_count
FROM shout_topic st
JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL
JOIN shout_author sa ON sa.shout = s.id
WHERE st.topic IN ({",".join(map(str, topic_ids))})
GROUP BY st.topic
"""
authors_stats = {row[0]: row[1] for row in session.execute(text(authors_stats_query))}
# Запрос на получение статистики комментариев для выбранных тем
comments_stats_query = f"""
SELECT st.topic, COUNT(DISTINCT r.id) as comments_count
FROM shout_topic st
JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL
JOIN reaction r ON r.shout = s.id
WHERE st.topic IN ({",".join(map(str, topic_ids))})
GROUP BY st.topic
"""
comments_stats = {row[0]: row[1] for row in session.execute(text(comments_stats_query))}
# Формируем результат с добавлением статистики # Формируем результат с добавлением статистики
result = [] result = []
for topic in topics: for topic in topics:
@@ -134,6 +156,8 @@ async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None)
topic_dict["stat"] = { topic_dict["stat"] = {
"shouts": shouts_stats.get(topic.id, 0), "shouts": shouts_stats.get(topic.id, 0),
"followers": followers_stats.get(topic.id, 0), "followers": followers_stats.get(topic.id, 0),
"authors": authors_stats.get(topic.id, 0),
"comments": comments_stats.get(topic.id, 0),
} }
result.append(topic_dict) result.append(topic_dict)
@@ -202,23 +226,6 @@ async def get_topics_all(_, _info):
return await get_all_topics() return await get_all_topics()
# Запрос на получение тем с пагинацией и статистикой
@query.field("get_topics_paginated")
async def get_topics_paginated(_, _info, limit=100, offset=0, by=None):
"""
Получает список тем с пагинацией и статистикой.
Args:
limit: Максимальное количество возвращаемых тем
offset: Смещение для пагинации
by: Опциональные параметры сортировки
Returns:
list: Список тем с их статистикой
"""
return await get_topics_with_stats(limit, offset, None, by)
# Запрос на получение тем по сообществу # Запрос на получение тем по сообществу
@query.field("get_topics_by_community") @query.field("get_topics_by_community")
async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0, by=None): async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0, by=None):

View File

@@ -33,7 +33,6 @@ input DraftInput {
main_topic_id: Int # Changed from main_topic: Topic main_topic_id: Int # Changed from main_topic: Topic
media: [MediaItemInput] # Changed to use MediaItemInput media: [MediaItemInput] # Changed to use MediaItemInput
lead: String lead: String
description: String
subtitle: String subtitle: String
lang: String lang: String
seo: String seo: String

View File

@@ -33,6 +33,7 @@ type Query {
get_shout(slug: String, shout_id: Int): Shout get_shout(slug: String, shout_id: Int): Shout
load_shouts_by(options: LoadShoutsOptions): [Shout] load_shouts_by(options: LoadShoutsOptions): [Shout]
load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult] load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult]
get_search_results_count(text: String!): CountResult!
load_shouts_bookmarked(options: LoadShoutsOptions): [Shout] load_shouts_bookmarked(options: LoadShoutsOptions): [Shout]
# rating # rating
@@ -60,7 +61,7 @@ type Query {
get_topic(slug: String!): Topic get_topic(slug: String!): Topic
get_topics_all: [Topic] get_topics_all: [Topic]
get_topics_by_author(slug: String, user: String, author_id: Int): [Topic] get_topics_by_author(slug: String, user: String, author_id: Int): [Topic]
get_topics_by_community(slug: String, community_id: Int): [Topic] get_topics_by_community(community_id: Int!, limit: Int, offset: Int): [Topic]
# notifier # notifier
load_notifications(after: Int!, limit: Int, offset: Int): NotificationsResult! load_notifications(after: Int!, limit: Int, offset: Int): NotificationsResult!

View File

@@ -80,7 +80,6 @@ type Shout {
layout: String! layout: String!
lead: String lead: String
description: String
subtitle: String subtitle: String
lang: String lang: String
cover: String cover: String
@@ -100,6 +99,7 @@ type Shout {
featured_at: Int featured_at: Int
deleted_at: Int deleted_at: Int
seo: String # generated if not set
version_of: Shout # TODO: use version_of somewhere version_of: Shout # TODO: use version_of somewhere
draft: Draft draft: Draft
media: [MediaItem] media: [MediaItem]
@@ -111,13 +111,12 @@ type Draft {
id: Int! id: Int!
created_at: Int! created_at: Int!
created_by: Author! created_by: Author!
community: Community!
layout: String layout: String
slug: String slug: String
title: String title: String
subtitle: String subtitle: String
lead: String lead: String
description: String
body: String body: String
media: [MediaItem] media: [MediaItem]
cover: String cover: String
@@ -137,7 +136,7 @@ type Draft {
type Stat { type Stat {
rating: Int rating: Int
commented: Int comments_count: Int
viewed: Int viewed: Int
last_commented_at: Int last_commented_at: Int
} }
@@ -208,6 +207,7 @@ type CommonResult {
} }
type SearchResult { type SearchResult {
id: Int!
slug: String! slug: String!
title: String! title: String!
cover: String cover: String
@@ -275,3 +275,7 @@ type MyRateComment {
my_rate: ReactionKind my_rate: ReactionKind
} }
type CountResult {
count: Int!
}

34
server.py Normal file
View File

@@ -0,0 +1,34 @@
import sys
from pathlib import Path
from granian.constants import Interfaces
from granian.log import LogLevels
from granian.server import Server
from settings import PORT
from utils.logger import root_logger as logger
if __name__ == "__main__":
logger.info("started")
try:
granian_instance = Server(
"main:app",
address="0.0.0.0",
port=PORT,
interface=Interfaces.ASGI,
workers=1,
websockets=False,
log_level=LogLevels.debug,
backlog=2048,
)
if "dev" in sys.argv:
logger.info("dev mode, building ssl context")
granian_instance.build_ssl_context(cert=Path("localhost.pem"), key=Path("localhost-key.pem"), password=None)
granian_instance.serve()
except Exception as error:
logger.error(error, exc_info=True)
raise
finally:
logger.info("stopped")

View File

@@ -259,3 +259,27 @@ def get_json_builder():
# Используем их в коде # Используем их в коде
json_builder, json_array_builder, json_cast = get_json_builder() json_builder, json_array_builder, json_cast = get_json_builder()
async def fetch_all_shouts(session=None):
"""Fetch all published shouts for search indexing"""
from orm.shout import Shout
close_session = False
if session is None:
session = local_session()
close_session = True
try:
# Fetch only published and non-deleted shouts
query = session.query(Shout).filter(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None)
)
shouts = query.all()
return shouts
except Exception as e:
logger.error(f"Error fetching shouts for search indexing: {e}")
return []
finally:
if close_session:
session.close()

View File

@@ -2,231 +2,692 @@ import asyncio
import json import json
import logging import logging
import os import os
import httpx
import time
from collections import defaultdict
from datetime import datetime, timedelta
import orjson # Set up proper logging
from opensearchpy import OpenSearch
from services.redis import redis
from utils.encoders import CustomJSONEncoder
# Set redis logging level to suppress DEBUG messages
logger = logging.getLogger("search") logger = logging.getLogger("search")
logger.setLevel(logging.WARNING) logger.setLevel(logging.INFO) # Change to INFO to see more details
ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "") # Configuration for search service
ELASTIC_USER = os.environ.get("ELASTIC_USER", "") SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "") TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none")
ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200) MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25"))
ELASTIC_URL = os.environ.get(
"ELASTIC_URL",
f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}",
)
REDIS_TTL = 86400 # 1 день в секундах
index_settings = { # Search cache configuration
"settings": { SEARCH_CACHE_ENABLED = bool(os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"])
"index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"}, SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "900")) # Default: 15 minutes
"analysis": { SEARCH_MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "0.1"))
"analyzer": { SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200"))
"ru": { SEARCH_USE_REDIS = bool(os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"])
"tokenizer": "standard",
"filter": ["lowercase", "ru_stop", "ru_stemmer"],
}
},
"filter": {
"ru_stemmer": {"type": "stemmer", "language": "russian"},
"ru_stop": {"type": "stop", "stopwords": "_russian_"},
},
},
},
"mappings": {
"properties": {
"body": {"type": "text", "analyzer": "ru"},
"title": {"type": "text", "analyzer": "ru"},
"subtitle": {"type": "text", "analyzer": "ru"},
"lead": {"type": "text", "analyzer": "ru"},
"media": {"type": "text", "analyzer": "ru"},
}
},
}
expected_mapping = index_settings["mappings"] search_offset = 0
# Создание цикла событий # Import Redis client if Redis caching is enabled
search_loop = asyncio.get_event_loop() if SEARCH_USE_REDIS:
try:
from services.redis import redis
logger.info("Redis client imported for search caching")
except ImportError:
logger.warning("Redis client import failed, falling back to memory cache")
SEARCH_USE_REDIS = False
# В начале файла добавим флаг class SearchCache:
SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", "")) """Cache for search results to enable efficient pagination"""
def __init__(self, ttl_seconds=SEARCH_CACHE_TTL_SECONDS, max_items=100):
self.cache = {} # Maps search query to list of results
self.last_accessed = {} # Maps search query to last access timestamp
self.ttl = ttl_seconds
self.max_items = max_items
self._redis_prefix = "search_cache:"
def get_indices_stats(): async def store(self, query, results):
indices_stats = search_service.client.cat.indices(format="json") """Store search results for a query"""
for index_info in indices_stats: normalized_query = self._normalize_query(query)
index_name = index_info["index"]
if not index_name.startswith("."):
index_health = index_info["health"]
index_status = index_info["status"]
pri_shards = index_info["pri"]
rep_shards = index_info["rep"]
docs_count = index_info["docs.count"]
docs_deleted = index_info["docs.deleted"]
store_size = index_info["store.size"]
pri_store_size = index_info["pri.store.size"]
logger.info(f"Index: {index_name}") if SEARCH_USE_REDIS:
logger.info(f"Health: {index_health}") try:
logger.info(f"Status: {index_status}") serialized_results = json.dumps(results)
logger.info(f"Primary Shards: {pri_shards}") await redis.set(
logger.info(f"Replica Shards: {rep_shards}") f"{self._redis_prefix}{normalized_query}",
logger.info(f"Documents Count: {docs_count}") serialized_results,
logger.info(f"Deleted Documents: {docs_deleted}") ex=self.ttl
logger.info(f"Store Size: {store_size}") )
logger.info(f"Primary Store Size: {pri_store_size}") logger.info(f"Stored {len(results)} search results for query '{query}' in Redis")
return True
except Exception as e:
logger.error(f"Error storing search results in Redis: {e}")
# Fall back to memory cache if Redis fails
# First cleanup if needed for memory cache
if len(self.cache) >= self.max_items:
self._cleanup()
# Store results and update timestamp
self.cache[normalized_query] = results
self.last_accessed[normalized_query] = time.time()
logger.info(f"Cached {len(results)} search results for query '{query}' in memory")
return True
async def get(self, query, limit=10, offset=0):
"""Get paginated results for a query"""
normalized_query = self._normalize_query(query)
all_results = None
# Try to get from Redis first
if SEARCH_USE_REDIS:
try:
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
if cached_data:
all_results = json.loads(cached_data)
logger.info(f"Retrieved search results for '{query}' from Redis")
except Exception as e:
logger.error(f"Error retrieving search results from Redis: {e}")
# Fall back to memory cache if not in Redis
if all_results is None and normalized_query in self.cache:
all_results = self.cache[normalized_query]
self.last_accessed[normalized_query] = time.time()
logger.info(f"Retrieved search results for '{query}' from memory cache")
# If not found in any cache
if all_results is None:
logger.info(f"Cache miss for query '{query}'")
return None
# Return paginated subset
end_idx = min(offset + limit, len(all_results))
if offset >= len(all_results):
logger.warning(f"Requested offset {offset} exceeds result count {len(all_results)}")
return []
logger.info(f"Cache hit for '{query}': serving {offset}:{end_idx} of {len(all_results)} results")
return all_results[offset:end_idx]
async def has_query(self, query):
"""Check if query exists in cache"""
normalized_query = self._normalize_query(query)
# Check Redis first
if SEARCH_USE_REDIS:
try:
exists = await redis.get(f"{self._redis_prefix}{normalized_query}")
if exists:
return True
except Exception as e:
logger.error(f"Error checking Redis for query existence: {e}")
# Fall back to memory cache
return normalized_query in self.cache
async def get_total_count(self, query):
"""Get total count of results for a query"""
normalized_query = self._normalize_query(query)
# Check Redis first
if SEARCH_USE_REDIS:
try:
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
if cached_data:
all_results = json.loads(cached_data)
return len(all_results)
except Exception as e:
logger.error(f"Error getting result count from Redis: {e}")
# Fall back to memory cache
if normalized_query in self.cache:
return len(self.cache[normalized_query])
return 0
def _normalize_query(self, query):
"""Normalize query string for cache key"""
if not query:
return ""
# Simple normalization - lowercase and strip whitespace
return query.lower().strip()
def _cleanup(self):
"""Remove oldest entries if memory cache is full"""
now = time.time()
# First remove expired entries
expired_keys = [
key for key, last_access in self.last_accessed.items()
if now - last_access > self.ttl
]
for key in expired_keys:
if key in self.cache:
del self.cache[key]
if key in self.last_accessed:
del self.last_accessed[key]
logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries")
# If still above max size, remove oldest entries
if len(self.cache) >= self.max_items:
# Sort by last access time
sorted_items = sorted(self.last_accessed.items(), key=lambda x: x[1])
# Remove oldest 20%
remove_count = max(1, int(len(sorted_items) * 0.2))
for key, _ in sorted_items[:remove_count]:
if key in self.cache:
del self.cache[key]
if key in self.last_accessed:
del self.last_accessed[key]
logger.info(f"Removed {remove_count} oldest search cache entries")
class SearchService: class SearchService:
def __init__(self, index_name="search_index"): def __init__(self):
logger.info("Инициализируем поиск...") logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}")
self.index_name = index_name self.available = SEARCH_ENABLED
self.client = None # Use different timeout settings for indexing and search requests
self.lock = asyncio.Lock() self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
# Initialize search cache
self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None
# Инициализация клиента OpenSearch только если поиск включен if not self.available:
if SEARCH_ENABLED: logger.info("Search disabled (SEARCH_ENABLED = False)")
try:
self.client = OpenSearch( if SEARCH_CACHE_ENABLED:
hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}], cache_location = "Redis" if SEARCH_USE_REDIS else "Memory"
http_compress=True, logger.info(f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s")
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD), logger.info(f"Minimum score filter: {SEARCH_MIN_SCORE}, prefetch size: {SEARCH_PREFETCH_SIZE}")
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
logger.info("Клиент OpenSearch.org подключен")
search_loop.create_task(self.check_index())
except Exception as exc:
logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}")
self.client = None
else:
logger.info("Поиск отключен (ELASTIC_HOST не установлен)")
async def info(self): async def info(self):
if not SEARCH_ENABLED: """Return information about search service"""
if not self.available:
return {"status": "disabled"} return {"status": "disabled"}
try: try:
return get_indices_stats() response = await self.client.get("/info")
response.raise_for_status()
result = response.json()
logger.info(f"Search service info: {result}")
return result
except Exception as e: except Exception as e:
logger.error(f"Failed to get search info: {e}") logger.error(f"Failed to get search info: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
def delete_index(self): def is_ready(self):
if self.client: """Check if service is available"""
logger.warning(f"[!!!] Удаляем индекс {self.index_name}") return self.available
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
def create_index(self): async def verify_docs(self, doc_ids):
if self.client: """Verify which documents exist in the search index"""
logger.info(f"Создается индекс: {self.index_name}") if not self.available:
self.client.indices.create(index=self.index_name, body=index_settings) return {"status": "disabled"}
logger.info(f"Индекс {self.index_name} создан")
async def check_index(self): try:
if self.client: logger.info(f"Verifying {len(doc_ids)} documents in search index")
logger.info(f"Проверяем индекс {self.index_name}...") response = await self.client.post(
if not self.client.indices.exists(index=self.index_name): "/verify-docs",
self.create_index() json={"doc_ids": doc_ids},
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping) timeout=60.0 # Longer timeout for potentially large ID lists
else: )
logger.info(f"Найден существующий индекс {self.index_name}") response.raise_for_status()
# Проверка и обновление структуры индекса, если необходимо result = response.json()
result = self.client.indices.get_mapping(index=self.index_name)
if isinstance(result, str): # Log summary of verification results
result = orjson.loads(result) missing_count = len(result.get("missing", []))
if isinstance(result, dict): logger.info(f"Document verification complete: {missing_count} missing out of {len(doc_ids)} total")
mapping = result.get(self.index_name, {}).get("mappings")
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}") return result
expected_keys = expected_mapping["properties"].keys() except Exception as e:
if mapping and mapping["properties"].keys() != expected_keys: logger.error(f"Document verification error: {e}")
logger.info(f"Ожидаемая структура индексации: {expected_mapping}") return {"status": "error", "message": str(e)}
logger.warning("[!!!] Требуется переиндексация всех данных")
self.delete_index()
self.client = None
else:
logger.error("клиент не инициализован, невозможно проверить индекс")
def index(self, shout): def index(self, shout):
if not SEARCH_ENABLED: """Index a single document"""
if not self.available:
return
logger.info(f"Indexing post {shout.id}")
# Start in background to not block
asyncio.create_task(self.perform_index(shout))
async def perform_index(self, shout):
"""Actually perform the indexing operation"""
if not self.available:
return return
if self.client: try:
logger.info(f"Индексируем пост {shout.id}") # Repeat title 3 times for higher keyword relevance
index_body = { title_repeat = ". ".join(filter(None, [shout.title] * 3))
"body": shout.body,
"title": shout.title,
"subtitle": shout.subtitle,
"lead": shout.lead,
"media": shout.media,
}
asyncio.create_task(self.perform_index(shout, index_body))
async def perform_index(self, shout, index_body): # Combine all text fields
if self.client: text_parts = [
title_repeat,
shout.subtitle or "",
shout.lead or "",
shout.body or "",
shout.media or ""
]
text = " ".join(filter(None, text_parts))
if not text.strip():
logger.warning(f"No text content to index for shout {shout.id}")
return
logger.info(f"Indexing document: ID={shout.id}, Text length={len(text)}")
# Send to txtai service
response = await self.client.post(
"/index",
json={"id": str(shout.id), "text": text}
)
response.raise_for_status()
result = response.json()
logger.info(f"Post {shout.id} successfully indexed: {result}")
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
async def bulk_index(self, shouts):
"""Index multiple documents at once with adaptive batch sizing"""
if not self.available or not shouts:
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
return
start_time = time.time()
logger.info(f"Starting bulk indexing of {len(shouts)} documents")
MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request
max_batch_size = MAX_BATCH_SIZE
total_indexed = 0
total_skipped = 0
total_truncated = 0
total_retries = 0
# Group documents by size to process smaller documents in larger batches
small_docs = []
medium_docs = []
large_docs = []
# First pass: prepare all documents and categorize by size
for shout in shouts:
try: try:
await asyncio.wait_for( text_fields = []
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0
) # Repeat title 3 times
except asyncio.TimeoutError: title_repeat = ". ".join(filter(None, [getattr(shout, "title", None)] * 3))
logger.error(f"Indexing timeout for shout {shout.id}")
text_fields = [title_repeat]
for field_name in ['subtitle', 'lead', 'body']:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
text_fields.append(field_value.strip())
# Media field processing remains the same
media = getattr(shout, 'media', None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if 'title' in media_json:
text_fields.append(media_json['title'])
if 'body' in media_json:
text_fields.append(media_json['body'])
except json.JSONDecodeError:
text_fields.append(media)
elif isinstance(media, dict):
if 'title' in media:
text_fields.append(media['title'])
if 'body' in media:
text_fields.append(media['body'])
text = " ".join(text_fields)
if not text.strip():
total_skipped += 1
continue
# Truncate text if it exceeds the maximum length
original_length = len(text)
if original_length > MAX_TEXT_LENGTH:
text = text[:MAX_TEXT_LENGTH]
total_truncated += 1
document = {
"id": str(shout.id),
"text": text
}
# Categorize by size
text_len = len(text)
if text_len > 5000:
large_docs.append(document)
elif text_len > 2000:
medium_docs.append(document)
else:
small_docs.append(document)
total_indexed += 1
except Exception as e: except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}") logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
total_skipped += 1
# Process each category with appropriate batch sizes
logger.info(f"Documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
# Process small documents (larger batches)
if small_docs:
batch_size = min(max_batch_size, 15)
await self._process_document_batches(small_docs, batch_size, "small")
# Process medium documents (medium batches)
if medium_docs:
batch_size = min(max_batch_size, 10)
await self._process_document_batches(medium_docs, batch_size, "medium")
# Process large documents (small batches)
if large_docs:
batch_size = min(max_batch_size, 3)
await self._process_document_batches(large_docs, batch_size, "large")
elapsed = time.time() - start_time
logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries")
async def _process_document_batches(self, documents, batch_size, size_category):
"""Process document batches with retry logic"""
# Check for possible database corruption before starting
db_error_count = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
batch_id = f"{size_category}-{i//batch_size + 1}"
logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents")
retry_count = 0
max_retries = 3
success = False
# Process with retries
while not success and retry_count < max_retries:
try:
logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})")
response = await self.index_client.post(
"/bulk-index",
json=batch,
timeout=120.0 # Explicit longer timeout for large batches
)
# Handle 422 validation errors - these won't be fixed by retrying
if response.status_code == 422:
error_detail = response.json()
truncated_error = self._truncate_error_detail(error_detail)
logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}")
break
# Handle 500 server errors - these might be fixed by retrying with smaller batches
elif response.status_code == 500:
db_error_count += 1
# If we've seen multiple 500s, log a critical error
if db_error_count >= 3:
logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
# Try again with exponential backoff
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter
await asyncio.sleep(wait_time)
continue
# Final retry, split the batch
elif len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
else:
# Can't split a single document
break
# Normal success case
response.raise_for_status()
success = True
db_error_count = 0 # Reset error counter on success
except Exception as e:
error_str = str(e).lower()
if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str:
db_error_count += 1
if db_error_count >= 2:
logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time)
else:
if len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
async def _process_single_batch(self, documents, batch_id):
"""Process a single batch with maximum reliability"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if not documents:
return
response = await self.index_client.post(
"/bulk-index",
json=documents,
timeout=90.0
)
response.raise_for_status()
return # Success, exit the retry loop
except Exception as e:
error_str = str(e).lower()
retry_count += 1
if "dictionary changed size" in error_str or "transaction error" in error_str:
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time) # Wait for txtai to recover
continue
if retry_count >= max_retries and len(documents) > 1:
for i, doc in enumerate(documents):
try:
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
resp.raise_for_status()
except Exception as e2:
pass
return # Exit after individual processing attempt
def _truncate_error_detail(self, error_detail):
"""Truncate error details for logging"""
truncated_detail = error_detail.copy() if isinstance(error_detail, dict) else error_detail
if isinstance(truncated_detail, dict) and 'detail' in truncated_detail and isinstance(truncated_detail['detail'], list):
for i, item in enumerate(truncated_detail['detail']):
if isinstance(item, dict) and 'input' in item:
if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']):
if 'documents' in item['input'] and isinstance(item['input']['documents'], list):
for j, doc in enumerate(item['input']['documents']):
if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100:
item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]"
if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100:
item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]"
return truncated_detail
async def search(self, text, limit, offset): async def search(self, text, limit, offset):
if not SEARCH_ENABLED: """Search documents"""
if not self.available:
return [] return []
logger.info(f"Ищем: {text} {offset}+{limit}") if not isinstance(text, str) or not text.strip():
search_body = { return []
"query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}}
}
if self.client: logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})")
search_response = self.client.search(
index=self.index_name, # Check if we can serve from cache
body=search_body, if SEARCH_CACHE_ENABLED:
size=limit, has_cache = await self.cache.has_query(text)
from_=offset, if has_cache:
_source=False, cached_results = await self.cache.get(text, limit, offset)
_source_excludes=["title", "body", "subtitle", "media", "lead", "_index"], if cached_results is not None:
return cached_results
# Not in cache or cache disabled, perform new search
try:
search_limit = limit
search_offset = offset
if SEARCH_CACHE_ENABLED:
search_limit = SEARCH_PREFETCH_SIZE
search_offset = 0
else:
search_limit = limit
search_offset = offset
response = await self.client.post(
"/search",
json={"text": text, "limit": search_limit, "offset": search_offset}
) )
hits = search_response["hits"]["hits"] response.raise_for_status()
results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits]
# если результаты не пустые result = response.json()
if results:
# Кэширование в Redis с TTL
redis_key = f"search:{text}:{offset}+{limit}"
await redis.execute(
"SETEX",
redis_key,
REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder),
)
return results
return []
formatted_results = result.get("results", [])
valid_results = []
for item in formatted_results:
doc_id = item.get("id")
if doc_id and doc_id.isdigit():
valid_results.append(item)
if len(valid_results) != len(formatted_results):
formatted_results = valid_results
if SEARCH_MIN_SCORE > 0:
initial_count = len(formatted_results)
formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
if SEARCH_CACHE_ENABLED:
await self.cache.store(text, formatted_results)
end_idx = offset + limit
page_results = formatted_results[offset:end_idx]
return page_results
return formatted_results
except Exception as e:
logger.error(f"Search error for '{text}': {e}", exc_info=True)
return []
async def check_index_status(self):
"""Get detailed statistics about the search index health"""
if not self.available:
return {"status": "disabled"}
try:
response = await self.client.get("/index-status")
response.raise_for_status()
result = response.json()
if result.get("consistency", {}).get("status") != "ok":
null_count = result.get("consistency", {}).get("null_embeddings_count", 0)
if null_count > 0:
logger.warning(f"Found {null_count} documents with NULL embeddings")
return result
except Exception as e:
logger.error(f"Failed to check index status: {e}")
return {"status": "error", "message": str(e)}
# Create the search service singleton
search_service = SearchService() search_service = SearchService()
# API-compatible function to perform a search
async def search_text(text: str, limit: int = 50, offset: int = 0): async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = [] payload = []
if search_service.client: if search_service.available:
# Использование метода search_post из OpenSearchService
payload = await search_service.search(text, limit, offset) payload = await search_service.search(text, limit, offset)
return payload return payload
async def get_search_count(text: str):
"""Get total count of results for a query without fetching all results"""
if search_service.available and SEARCH_CACHE_ENABLED:
if await search_service.cache.has_query(text):
return await search_service.cache.get_total_count(text)
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
# Проверить что URL корректный async def initialize_search_index(shouts_data):
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net") """Initialize search index with existing data during application startup"""
if not SEARCH_ENABLED:
return
if not shouts_data:
return
info = await search_service.info()
if info.get("status") in ["error", "unavailable", "disabled"]:
return
index_stats = info.get("index_stats", {})
indexed_doc_count = index_stats.get("document_count", 0)
index_status = await search_service.check_index_status()
if index_status.get("status") == "inconsistent":
problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", [])
if problem_ids:
problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids]
if problem_docs:
await search_service.bulk_index(problem_docs)
db_ids = [str(shout.id) for shout in shouts_data]
try:
numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()]
if numeric_ids:
min_id = min(numeric_ids)
max_id = max(numeric_ids)
id_range = max_id - min_id + 1
except Exception as e:
pass
if abs(indexed_doc_count - len(shouts_data)) > 10:
doc_ids = [str(shout.id) for shout in shouts_data]
verification = await search_service.verify_docs(doc_ids)
if verification.get("status") == "error":
return
missing_ids = verification.get("missing", [])
if missing_ids:
missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids]
await search_service.bulk_index(missing_docs)
else:
pass
try:
test_query = "test"
test_results = await search_text(test_query, 5)
if test_results:
categories = set()
for result in test_results:
result_id = result.get("id")
matching_shouts = [s for s in shouts_data if str(s.id) == result_id]
if matching_shouts and hasattr(matching_shouts[0], 'category'):
categories.add(getattr(matching_shouts[0], 'category', 'unknown'))
except Exception as e:
pass

View File

@@ -2,9 +2,7 @@ import asyncio
import os import os
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Dict from typing import Dict, Optional
import orjson
# ga # ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta import BetaAnalyticsDataClient
@@ -20,33 +18,39 @@ from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from services.db import local_session from services.db import local_session
from services.redis import redis
from utils.logger import root_logger as logger from utils.logger import root_logger as logger
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json") GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json")
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "") GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "")
VIEWS_FILEPATH = "/dump/views.json"
class ViewedStorage: class ViewedStorage:
"""
Класс для хранения и доступа к данным о просмотрах.
Использует Redis в качестве основного хранилища и Google Analytics для сбора новых данных.
"""
lock = asyncio.Lock() lock = asyncio.Lock()
precounted_by_slug = {}
views_by_shout = {} views_by_shout = {}
shouts_by_topic = {} shouts_by_topic = {}
shouts_by_author = {} shouts_by_author = {}
views = None views = None
period = 60 * 60 # каждый час period = 60 * 60 # каждый час
analytics_client: BetaAnalyticsDataClient | None = None analytics_client: Optional[BetaAnalyticsDataClient] = None
auth_result = None auth_result = None
running = False running = False
redis_views_key = None
last_update_timestamp = 0
start_date = datetime.now().strftime("%Y-%m-%d") start_date = datetime.now().strftime("%Y-%m-%d")
@staticmethod @staticmethod
async def init(): async def init():
"""Подключение к клиенту Google Analytics с использованием аутентификации""" """Подключение к клиенту Google Analytics и загрузка данных о просмотрах из Redis"""
self = ViewedStorage self = ViewedStorage
async with self.lock: async with self.lock:
# Загрузка предварительно подсчитанных просмотров из файла JSON # Загрузка предварительно подсчитанных просмотров из Redis
self.load_precounted_views() await self.load_views_from_redis()
os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH) os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH): if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
@@ -62,40 +66,54 @@ class ViewedStorage:
self.running = False self.running = False
@staticmethod @staticmethod
def load_precounted_views(): async def load_views_from_redis():
"""Загрузка предварительно подсчитанных просмотров из файла JSON""" """Загрузка предварительно подсчитанных просмотров из Redis"""
self = ViewedStorage self = ViewedStorage
viewfile_path = VIEWS_FILEPATH
if not os.path.exists(viewfile_path):
viewfile_path = os.path.join(os.path.curdir, "views.json")
if not os.path.exists(viewfile_path):
logger.warning(" * views.json not found")
return
logger.info(f" * loading views from {viewfile_path}") # Подключаемся к Redis если соединение не установлено
try: if not redis._client:
start_date_int = os.path.getmtime(viewfile_path) await redis.connect()
start_date_str = datetime.fromtimestamp(start_date_int).strftime("%Y-%m-%d")
self.start_date = start_date_str # Получаем список всех ключей migrated_views_* и находим самый последний
keys = await redis.execute("KEYS", "migrated_views_*")
if not keys:
logger.warning(" * No migrated_views keys found in Redis")
return
# Фильтруем только ключи timestamp формата (исключаем migrated_views_slugs)
timestamp_keys = [k for k in keys if k != "migrated_views_slugs"]
if not timestamp_keys:
logger.warning(" * No migrated_views timestamp keys found in Redis")
return
# Сортируем по времени создания (в названии ключа) и берем последний
timestamp_keys.sort()
latest_key = timestamp_keys[-1]
self.redis_views_key = latest_key
# Получаем метку времени создания для установки start_date
timestamp = await redis.execute("HGET", latest_key, "_timestamp")
if timestamp:
self.last_update_timestamp = int(timestamp)
timestamp_dt = datetime.fromtimestamp(int(timestamp))
self.start_date = timestamp_dt.strftime("%Y-%m-%d")
# Если данные сегодняшние, считаем их актуальными
now_date = datetime.now().strftime("%Y-%m-%d") now_date = datetime.now().strftime("%Y-%m-%d")
if now_date == self.start_date: if now_date == self.start_date:
logger.info(" * views data is up to date!") logger.info(" * Views data is up to date!")
else: else:
logger.warn(f" * {viewfile_path} is too old: {self.start_date}") logger.warning(f" * Views data is from {self.start_date}, may need update")
with open(viewfile_path, "r") as file: # Выводим информацию о количестве загруженных записей
precounted_views = orjson.loads(file.read()) total_entries = await redis.execute("HGET", latest_key, "_total")
self.precounted_by_slug.update(precounted_views) if total_entries:
logger.info(f" * {len(precounted_views)} shouts with views was loaded.") logger.info(f" * {total_entries} shouts with views loaded from Redis key: {latest_key}")
except Exception as e:
logger.error(f"precounted views loading error: {e}")
# noinspection PyTypeChecker # noinspection PyTypeChecker
@staticmethod @staticmethod
async def update_pages(): async def update_pages():
"""Запрос всех страниц от Google Analytics, отсортрованных по количеству просмотров""" """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage self = ViewedStorage
logger.info(" ⎧ views update from Google Analytics ---") logger.info(" ⎧ views update from Google Analytics ---")
if self.running: if self.running:
@@ -140,15 +158,40 @@ class ViewedStorage:
self.running = False self.running = False
@staticmethod @staticmethod
def get_shout(shout_slug="", shout_id=0) -> int: async def get_shout(shout_slug="", shout_id=0) -> int:
"""Получение метрики просмотров shout по slug или id.""" """
Получение метрики просмотров shout по slug или id.
Args:
shout_slug: Slug публикации
shout_id: ID публикации
Returns:
int: Количество просмотров
"""
self = ViewedStorage self = ViewedStorage
# Получаем данные из Redis для новой схемы хранения
if not redis._client:
await redis.connect()
fresh_views = self.views_by_shout.get(shout_slug, 0) fresh_views = self.views_by_shout.get(shout_slug, 0)
precounted_views = self.precounted_by_slug.get(shout_slug, 0)
return fresh_views + precounted_views # Если есть id, пытаемся получить данные из Redis по ключу migrated_views_<timestamp>
if shout_id and self.redis_views_key:
precounted_views = await redis.execute("HGET", self.redis_views_key, str(shout_id))
if precounted_views:
return fresh_views + int(precounted_views)
# Если нет id или данных, пытаемся получить по slug из отдельного хеша
precounted_views = await redis.execute("HGET", "migrated_views_slugs", shout_slug)
if precounted_views:
return fresh_views + int(precounted_views)
return fresh_views
@staticmethod @staticmethod
def get_shout_media(shout_slug) -> Dict[str, int]: async def get_shout_media(shout_slug) -> Dict[str, int]:
"""Получение метрики воспроизведения shout по slug.""" """Получение метрики воспроизведения shout по slug."""
self = ViewedStorage self = ViewedStorage
@@ -157,23 +200,29 @@ class ViewedStorage:
return self.views_by_shout.get(shout_slug, 0) return self.views_by_shout.get(shout_slug, 0)
@staticmethod @staticmethod
def get_topic(topic_slug) -> int: async def get_topic(topic_slug) -> int:
"""Получение суммарного значения просмотров темы.""" """Получение суммарного значения просмотров темы."""
self = ViewedStorage self = ViewedStorage
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_topic.get(topic_slug, [])) views_count = 0
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod @staticmethod
def get_author(author_slug) -> int: async def get_author(author_slug) -> int:
"""Получение суммарного значения просмотров автора.""" """Получение суммарного значения просмотров автора."""
self = ViewedStorage self = ViewedStorage
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_author.get(author_slug, [])) views_count = 0
for shout_slug in self.shouts_by_author.get(author_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod @staticmethod
def update_topics(shout_slug): def update_topics(shout_slug):
"""Обновление счетчиков темы по slug shout""" """Обновление счетчиков темы по slug shout"""
self = ViewedStorage self = ViewedStorage
with local_session() as session: with local_session() as session:
# Определение вспомогательной функции для избежа<EFBFBD><EFBFBD>ия повторения кода # Определение вспомогательной функции для избежания повторения кода
def update_groups(dictionary, key, value): def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value])) dictionary[key] = list(set(dictionary.get(key, []) + [value]))