57 Commits

Author SHA1 Message Date
Stepan Vladovskiy
27b0928e73 feat: title weight procedure
All checks were successful
Deploy on push / deploy (push) Successful in 1m16s
2025-04-15 19:32:34 -03:00
Stepan Vladovskiy
e382cc1ea5 Merge branch 'dev' into feat/sv-searching-txtai
All checks were successful
Deploy on push / deploy (push) Successful in 6s
:
2025-04-15 19:20:48 -03:00
6920351b82 schema-fix
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-04-15 20:30:12 +03:00
eb216a5f36 draft-seo-handling
All checks were successful
Deploy on push / deploy (push) Successful in 1m10s
2025-04-15 20:16:01 +03:00
bd129efde6 update-seo-handling 2025-04-15 20:14:42 +03:00
b9f6033e66 generate seo text when draft created 2025-04-15 20:09:22 +03:00
710f522c8f schema-upgrade
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-14 19:53:14 +03:00
0de4404cb1 draft-community
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-14 16:02:19 +03:00
to
83d61ca76d Merge branch 'dev' into feat/sv-searching-txtai
All checks were successful
Deploy on push / deploy (push) Successful in 6s
2025-04-13 05:36:18 +00:00
1c61e889d6 update-draft-fix
All checks were successful
Deploy on push / deploy (push) Successful in 47s
2025-04-10 22:51:07 +03:00
fdedb75a2c topics-comments-stat
All checks were successful
Deploy on push / deploy (push) Successful in 45s
2025-04-10 19:14:27 +03:00
f20000f1f6 topic.stat.authors-fix2
All checks were successful
Deploy on push / deploy (push) Successful in 45s
2025-04-10 18:46:09 +03:00
7d50638b3a topic.stat.authors-fix
All checks were successful
Deploy on push / deploy (push) Successful in 1m20s
2025-04-10 18:39:31 +03:00
Stepan Vladovskiy
106222b0e0 debug: without debug logging. clean
All checks were successful
Deploy on push / deploy (push) Successful in 1m27s
2025-04-07 11:41:48 -03:00
Stepan Vladovskiy
c533241d1e fix(reader): sorting by rang not by id in cash
All checks were successful
Deploy on push / deploy (push) Successful in 6s
2025-04-03 13:51:13 -03:00
Stepan Vladovskiy
78326047bf fix(reader.py): change sorting and answer on querys
All checks were successful
Deploy on push / deploy (push) Successful in 50s
2025-04-03 13:20:18 -03:00
Stepan Vladovskiy
bc4ec79240 fix(search.py): store all results in cash not only first offset
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-04-03 13:10:53 -03:00
Stepan Vladovskiy
a0db5707c4 feat: add cash for storing searchresalts and hold them for working pagination. Now we are have offset for use on frontend
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-04-01 16:01:09 -03:00
Stepan Vladovskiy
ecc443c3ad refactor(reader.py): Remove the unnecessary topic joins that cause duplicate results
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-04-01 12:57:46 -03:00
Stepan Vladovskiy
9a02ca74ad merged with dev
All checks were successful
Deploy on push / deploy (push) Successful in 1m24s
2025-03-31 13:38:32 -03:00
Stepan Vladovskiy
9ebb81cbd3 refactor(reader.py): rm debug line 2025-03-31 13:32:51 -03:00
abbc074474 updateby-fix
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-31 14:39:02 +03:00
Stepan Vladovskiy
0bc55977ac debug(reader.py): query_with_stat(info) always
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-27 15:18:08 -03:00
Stepan Vladovskiy
ff3a4debce debug(reader.py): trying to handle main topic ids founded
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-27 14:43:17 -03:00
Stepan Vladovskiy
ae85b32f69 feat(type.qraphql): SearchResult with shout id
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-27 14:06:52 -03:00
Stepan Vladovskiy
34a354e9e3 debug(reader.py: trying back shout id in query call
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-03-27 11:54:56 -03:00
4f599e097f [0.4.17] - 2025-03-26
All checks were successful
Deploy on push / deploy (push) Successful in 54s
- Fixed `'Reaction' object is not subscriptable` error in hierarchical comments:
  - Modified `get_reactions_with_stat()` to convert Reaction objects to dictionaries
  - Added default values for limit/offset parameters
  - Fixed `load_first_replies()` implementation with proper parameter passing
  - Added doctest with example usage
  - Limited child comments to 100 per parent for performance
2025-03-26 08:54:10 +03:00
a5eaf4bb65 commented->comments_count
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-26 08:25:18 +03:00
Stepan Vladovskiy
e405fb527b refactor(search.py): moved to use one table docs for embdings and docs store
All checks were successful
Deploy on push / deploy (push) Successful in 50s
2025-03-25 16:42:44 -03:00
Stepan Vladovskiy
7f36f93d92 feat(search.py): detects both missing documents and null embeddings
All checks were successful
Deploy on push / deploy (push) Successful in 1m32s
2025-03-25 15:18:29 -03:00
Stepan Vladovskiy
f089a32394 debug(search.py): with more logs when check sync of indexing
All checks were successful
Deploy on push / deploy (push) Successful in 1m3s
2025-03-25 14:44:05 -03:00
Stepan Vladovskiy
1fd623a660 feat: with index sync endpoints configs
All checks were successful
Deploy on push / deploy (push) Successful in 56s
2025-03-25 13:31:45 -03:00
Stepan Vladovskiy
88012f1b8c debug(server.py): with 4 workers (threds). cheking reindexing
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-25 12:21:59 -03:00
Stepan Vladovskiy
6e284640c0 feat: give little timeout for resource stab
All checks were successful
Deploy on push / deploy (push) Successful in 51s
2025-03-24 21:42:51 -03:00
Stepan Vladovskiy
077cb46482 debug: server.py -> threds 1 , search.py -> add 3 times reconect
All checks were successful
Deploy on push / deploy (push) Successful in 49s
2025-03-24 20:16:07 -03:00
Stepan Vladovskiy
60a13a9097 refactor(search.py): moved initialization logic in search-txtai instance
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-24 19:47:02 -03:00
3c56fdfaea get_topics_paginated-fix
All checks were successful
Deploy on push / deploy (push) Successful in 56s
2025-03-22 18:49:15 +03:00
81a8bf3c58 query-type-fix
All checks were successful
Deploy on push / deploy (push) Successful in 49s
2025-03-22 18:44:31 +03:00
Stepan Vladovskiy
316375bf18 debug(search.py): encrease batch size for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 1m1s
2025-03-21 17:56:54 -03:00
Stepan Vladovskiy
fb820f67fd debug(search.py): encrease batch size for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 17:48:26 -03:00
Stepan Vladovskiy
f1d9f4e036 feat(search.py): with db reset endpoint
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 17:28:54 -03:00
Stepan Vladovskiy
ebb67eb311 debug: decrease chars in search.py for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 52s
2025-03-21 16:53:00 -03:00
Stepan Vladovskiy
50a8c24ead feat(search.py): documnet for bulk indexing are categorized
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-21 15:40:29 -03:00
Stepan Vladovskiy
eb4b9363ab debug: change logs entris and indexing not wraps all in documents
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:32:45 -03:00
Stepan Vladovskiy
19c5028a0c debug: Limit max chars for bulk indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:18:32 -03:00
Stepan Vladovskiy
57e1e8e6bd debug: more logs in indexing
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-21 14:10:09 -03:00
Stepan Vladovskiy
385057ffcd debug: with logs in indexing procedure
All checks were successful
Deploy on push / deploy (push) Successful in 54s
2025-03-21 13:45:50 -03:00
Stepan Vladovskiy
90699768ff debug: start index
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-21 13:30:23 -03:00
Stepan Vladovskiy
ad0ca75aa9 debug: no redis for indexing in nackend side
All checks were successful
Deploy on push / deploy (push) Successful in 1m41s
2025-03-19 14:47:31 -03:00
Stepan Vladovskiy
39242d5e6c debug: add logs in search.py and change and input validation ... index ver too
All checks were successful
Deploy on push / deploy (push) Successful in 55s
2025-03-12 14:13:55 -03:00
Stepan Vladovskiy
24cca7f2cb debug: something wrong one stap back with logs
All checks were successful
Deploy on push / deploy (push) Successful in 53s
2025-03-12 13:11:19 -03:00
Stepan Vladovskiy
a9c7ac49d6 feat: with logs >>>
All checks were successful
Deploy on push / deploy (push) Successful in 59s
2025-03-12 13:07:27 -03:00
Stepan Vladovskiy
f249752db5 feat: moved txtai and search procedure in different instance
All checks were successful
Deploy on push / deploy (push) Successful in 2m18s
2025-03-12 12:06:09 -03:00
Stepan Vladovskiy
c0b2116da2 feat(db.py): added fetch_all_shouts, to populate the search index
All checks were successful
Deploy on push / deploy (push) Successful in 35s
2025-03-05 20:32:34 +00:00
Stepan Vladovskiy
59e71c8144 debug: fixed workflows gitea
All checks were successful
Deploy on push / deploy (push) Successful in 4m41s
2025-03-05 20:17:34 +00:00
Stepan Vladovskiy
e6a416383d debug: fixed workflows gitea
All checks were successful
Deploy on push / deploy (push) Successful in 15s
2025-03-05 20:16:32 +00:00
Stepan Vladovskiy
d55448398d feat(search.py): change to txtai server, with ai model. And fix granian workers 2025-03-05 20:08:21 +00:00
23 changed files with 1038 additions and 333 deletions

View File

@@ -29,7 +29,16 @@ jobs:
if: github.ref == 'refs/heads/dev'
uses: dokku/github-action@master
with:
branch: 'dev'
branch: 'main'
force: true
git_remote_url: 'ssh://dokku@v2.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Push to dokku for staging branch
if: github.ref == 'refs/heads/staging'
uses: dokku/github-action@master
with:
branch: 'dev'
git_remote_url: 'ssh://dokku@staging.discours.io:22/core'
ssh_private_key: ${{ secrets.SSH_PRIVATE_KEY }}
git_push_flags: '--force'

6
.gitignore vendored
View File

@@ -128,6 +128,9 @@ dmypy.json
.idea
temp.*
# Debug
DEBUG.log
discours.key
discours.crt
discours.pem
@@ -161,4 +164,5 @@ views.json
*.key
*.crt
*cache.json
.cursor
.cursor
.devcontainer/

View File

@@ -1,9 +1,29 @@
#### [0.4.19] - 2025-04-14
- dropped `Shout.description` and `Draft.description` to be UX-generated
- use redis to init views counters after migrator
#### [0.4.18] - 2025-04-10
- Fixed `Topic.stat.authors` and `Topic.stat.comments`
- Fixed unique constraint violation for empty slug values:
- Modified `update_draft` resolver to handle empty slug values
- Modified `create_draft` resolver to prevent empty slug values
- Added validation to prevent inserting or updating drafts with empty slug
- Fixed database error "duplicate key value violates unique constraint draft_slug_key"
#### [0.4.17] - 2025-03-26
- Fixed `'Reaction' object is not subscriptable` error in hierarchical comments:
- Modified `get_reactions_with_stat()` to convert Reaction objects to dictionaries
- Added default values for limit/offset parameters
- Fixed `load_first_replies()` implementation with proper parameter passing
- Added doctest with example usage
- Limited child comments to 100 per parent for performance
#### [0.4.16] - 2025-03-22
- Added hierarchical comments pagination:
- Created new GraphQL query `load_comments_branch` for efficient loading of hierarchical comments
- Ability to load root comments with their first N replies
- Added pagination for both root and child comments
- Using existing `commented` field in `Stat` type to display number of replies
- Using existing `comments_count` field in `Stat` type to display number of replies
- Added special `first_replies` field to store first replies to a comment
- Optimized SQL queries for efficient loading of comment hierarchies
- Implemented flexible comment sorting system (by time, rating)
@@ -41,8 +61,7 @@
- Implemented persistent Redis caching for author queries without TTL (invalidated only on changes)
- Optimized author retrieval with separate endpoints:
- `get_authors_all` - returns all non-deleted authors without statistics
- `get_authors_paginated` - returns authors with statistics and pagination support
- `load_authors_by` - optimized to use caching and efficient sorting
- `load_authors_by` - optimized to use caching and efficient sorting and pagination
- Improved SQL queries with optimized JOIN conditions and efficient filtering
- Added pre-aggregation of statistics (shouts count, followers count) in single efficient queries
- Implemented robust cache invalidation on author updates
@@ -54,7 +73,6 @@
- Implemented persistent Redis caching for topic queries (no TTL, invalidated only on changes)
- Optimized topic retrieval with separate endpoints for different use cases:
- `get_topics_all` - returns all topics without statistics for lightweight listing
- `get_topics_paginated` - returns topics with statistics and pagination support
- `get_topics_by_community` - adds pagination and optimized filtering by community
- Added SQLAlchemy-managed indexes directly in ORM models for automatic schema maintenance
- Created `sync_indexes()` function for automatic index synchronization during app startup
@@ -152,7 +170,7 @@
#### [0.4.4]
- `followers_stat` removed for shout
- sqlite3 support added
- `rating_stat` and `commented_stat` fixes
- `rating_stat` and `comments_count` fixes
#### [0.4.3]
- cache reimplemented

5
cache/cache.py vendored
View File

@@ -545,8 +545,9 @@ async def get_cached_data(key: str) -> Optional[Any]:
try:
cached_data = await redis.execute("GET", key)
if cached_data:
logger.debug(f"Данные получены из кеша по ключу {key}")
return orjson.loads(cached_data)
loaded = orjson.loads(cached_data)
logger.debug(f"Данные получены из кеша по ключу {key}: {len(loaded)}")
return loaded
return None
except Exception as e:
logger.error(f"Ошибка при получении данных из кеша: {e}")

View File

@@ -45,7 +45,7 @@ query LoadCommentsBranch(
reply_to
stat {
rating
commented
comments_count
}
first_replies {
id
@@ -61,7 +61,7 @@ query LoadCommentsBranch(
reply_to
stat {
rating
commented
comments_count
}
}
}
@@ -92,7 +92,7 @@ query LoadCommentsBranch(
- `reply_to`: ID родительского комментария (null для корневых)
- `first_replies`: Первые N дочерних комментариев
- `stat`: Статистика комментария, включающая:
- `commented`: Количество ответов на комментарий
- `comments_count`: Количество ответов на комментарий
- `rating`: Рейтинг комментария
## Примеры использования
@@ -150,7 +150,7 @@ const { data } = await client.query({
1. Для эффективной работы со сложными ветками обсуждений рекомендуется:
- Сначала загружать только корневые комментарии с первыми N ответами
- При наличии дополнительных ответов (когда `stat.commented > first_replies.length`)
- При наличии дополнительных ответов (когда `stat.comments_count > first_replies.length`)
добавить кнопку "Показать все ответы"
- При нажатии на кнопку загружать дополнительные ответы с помощью запроса с указанным `parentId`

View File

@@ -42,7 +42,7 @@
- Отдельный запрос `load_comments_branch` для оптимизированной загрузки ветки комментариев
- Возможность загрузки корневых комментариев статьи с первыми ответами на них
- Гибкая пагинация как для корневых, так и для дочерних комментариев
- Использование поля `stat.commented` для отображения количества ответов на комментарий
- Использование поля `stat.comments_count` для отображения количества ответов на комментарий
- Добавление специального поля `first_replies` для хранения первых ответов на комментарий
- Поддержка различных методов сортировки (новые, старые, популярные)
- Оптимизированные SQL запросы для минимизации нагрузки на базу данных

48
main.py
View File

@@ -17,7 +17,8 @@ from cache.revalidator import revalidation_manager
from services.exception import ExceptionHandlerMiddleware
from services.redis import redis
from services.schema import create_all_tables, resolvers
from services.search import search_service
#from services.search import search_service
from services.search import search_service, initialize_search_index
from services.viewed import ViewedStorage
from services.webhook import WebhookEndpoint, create_webhook_endpoint
from settings import DEV_SERVER_PID_FILE_NAME, MODE
@@ -34,24 +35,67 @@ async def start():
f.write(str(os.getpid()))
print(f"[main] process started in {MODE} mode")
async def check_search_service():
"""Check if search service is available and log result"""
info = await search_service.info()
if info.get("status") in ["error", "unavailable"]:
print(f"[WARNING] Search service unavailable: {info.get('message', 'unknown reason')}")
else:
print(f"[INFO] Search service is available: {info}")
# indexing DB data
# async def indexing():
# from services.db import fetch_all_shouts
# all_shouts = await fetch_all_shouts()
# await initialize_search_index(all_shouts)
async def lifespan(_app):
try:
print("[lifespan] Starting application initialization")
create_all_tables()
await asyncio.gather(
redis.connect(),
precache_data(),
ViewedStorage.init(),
create_webhook_endpoint(),
search_service.info(),
check_search_service(),
start(),
revalidation_manager.start(),
)
print("[lifespan] Basic initialization complete")
# Add a delay before starting the intensive search indexing
print("[lifespan] Waiting for system stabilization before search indexing...")
await asyncio.sleep(10) # 10-second delay to let the system stabilize
# Start search indexing as a background task with lower priority
asyncio.create_task(initialize_search_index_background())
yield
finally:
print("[lifespan] Shutting down application services")
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
await asyncio.gather(*tasks, return_exceptions=True)
print("[lifespan] Shutdown complete")
# Initialize search index in the background
async def initialize_search_index_background():
"""Run search indexing as a background task with low priority"""
try:
print("[search] Starting background search indexing process")
from services.db import fetch_all_shouts
# Get total count first (optional)
all_shouts = await fetch_all_shouts()
total_count = len(all_shouts) if all_shouts else 0
print(f"[search] Fetched {total_count} shouts for background indexing")
# Start the indexing process with the fetched shouts
print("[search] Beginning background search index initialization...")
await initialize_search_index(all_shouts)
print("[search] Background search index initialization complete")
except Exception as e:
print(f"[search] Error in background search indexing: {str(e)}")
# Создаем экземпляр GraphQL
graphql_app = GraphQL(schema, debug=True)

View File

@@ -31,6 +31,7 @@ class Draft(Base):
# required
created_at: int = Column(Integer, nullable=False, default=lambda: int(time.time()))
created_by: int = Column(ForeignKey("author.id"), nullable=False)
community: int = Column(ForeignKey("community.id"), nullable=False, default=1)
# optional
layout: str = Column(String, nullable=True, default="article")
@@ -38,7 +39,6 @@ class Draft(Base):
title: str = Column(String, nullable=True)
subtitle: str | None = Column(String, nullable=True)
lead: str | None = Column(String, nullable=True)
description: str | None = Column(String, nullable=True)
body: str = Column(String, nullable=False, comment="Body")
media: dict | None = Column(JSON, nullable=True)
cover: str | None = Column(String, nullable=True, comment="Cover image url")

View File

@@ -91,7 +91,6 @@ class Shout(Base):
cover: str | None = Column(String, nullable=True, comment="Cover image url")
cover_caption: str | None = Column(String, nullable=True, comment="Cover image alt caption")
lead: str | None = Column(String, nullable=True)
description: str | None = Column(String, nullable=True)
title: str = Column(String, nullable=False)
subtitle: str | None = Column(String, nullable=True)
layout: str = Column(String, nullable=False, default="article")

View File

@@ -13,5 +13,10 @@ starlette
gql
ariadne
granian
# NLP and search
httpx
orjson
pydantic
pydantic
trafilatura

View File

@@ -232,22 +232,6 @@ async def get_authors_all(_, _info):
return await get_all_authors()
@query.field("get_authors_paginated")
async def get_authors_paginated(_, _info, limit=50, offset=0, by=None):
"""
Получает список авторов с пагинацией и статистикой.
Args:
limit: Максимальное количество возвращаемых авторов
offset: Смещение для пагинации
by: Параметр сортировки (new/active)
Returns:
list: Список авторов с их статистикой
"""
return await get_authors_with_stats(limit, offset, by)
@query.field("get_author")
async def get_author(_, _info, slug="", author_id=0):
author_dict = None

View File

@@ -1,6 +1,7 @@
import time
from operator import or_
import trafilatura
from sqlalchemy.sql import and_
from cache.cache import (
@@ -30,7 +31,6 @@ def create_shout_from_draft(session, draft, author_id):
cover=draft.cover,
cover_caption=draft.cover_caption,
lead=draft.lead,
description=draft.description,
title=draft.title,
subtitle=draft.subtitle,
layout=draft.layout,
@@ -105,12 +105,21 @@ async def create_draft(_, info, draft_input):
if "title" not in draft_input or not draft_input["title"]:
draft_input["title"] = "" # Пустая строка вместо NULL
# Проверяем slug - он должен быть или не пустым, или не передаваться вообще
if "slug" in draft_input and (draft_input["slug"] is None or draft_input["slug"] == ""):
# При создании черновика удаляем пустой slug из входных данных
del draft_input["slug"]
try:
with local_session() as session:
# Remove id from input if present since it's auto-generated
if "id" in draft_input:
del draft_input["id"]
if "seo" not in draft_input and not draft_input["seo"]:
body_teaser = draft_input.get("body", "")[:300].split("\n")[:-1].join("\n")
draft_input["seo"] = draft_input.get("lead", body_teaser)
# Добавляем текущее время создания
draft_input["created_at"] = int(time.time())
@@ -142,13 +151,34 @@ async def update_draft(_, info, draft_id: int, draft_input):
if not user_id or not author_id:
return {"error": "Author ID are required"}
# Проверяем slug - он должен быть или не пустым, или не передаваться вообще
if "slug" in draft_input and (draft_input["slug"] is None or draft_input["slug"] == ""):
# Если slug пустой, либо удаляем его из входных данных, либо генерируем временный уникальный
# Вариант 1: просто удаляем ключ из входных данных, чтобы оставить старое значение
del draft_input["slug"]
# Вариант 2 (если нужно обновить): генерируем временный уникальный slug
# import uuid
# draft_input["slug"] = f"draft-{uuid.uuid4().hex[:8]}"
with local_session() as session:
draft = session.query(Draft).filter(Draft.id == draft_id).first()
if not draft:
return {"error": "Draft not found"}
if "seo" not in draft_input and not draft.seo:
body_src = draft_input["body"] if "body" in draft_input else draft.body
body_text = trafilatura.extract(body_src)
lead_src = draft_input["lead"] if "lead" in draft_input else draft.lead
lead_text = trafilatura.extract(lead_src)
body_teaser = body_text[:300].split(". ")[:-1].join(".\n")
draft_input["seo"] = lead_text or body_teaser
Draft.update(draft, draft_input)
draft.updated_at = int(time.time())
# Set updated_at and updated_by from the authenticated user
current_time = int(time.time())
draft.updated_at = current_time
draft.updated_by = author_id
session.commit()
return {"draft": draft}
@@ -249,7 +279,6 @@ async def publish_shout(_, info, shout_id: int):
shout.cover = draft.cover
shout.cover_caption = draft.cover_caption
shout.lead = draft.lead
shout.description = draft.description
shout.layout = draft.layout
shout.media = draft.media
shout.lang = draft.lang

View File

@@ -1,6 +1,7 @@
import time
import orjson
import trafilatura
from sqlalchemy import and_, desc, select
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.functions import coalesce
@@ -176,9 +177,16 @@ async def create_shout(_, info, inp):
logger.info(f"Creating shout with input: {inp}")
# Создаем публикацию без topics
body = inp.get("body", "")
lead = inp.get("lead", "")
body_text = trafilatura.extract(body)
lead_text = trafilatura.extract(lead)
seo = inp.get("seo", lead_text or body_text[:300].split(". ")[:-1].join(". "))
new_shout = Shout(
slug=slug,
body=inp.get("body", ""),
body=body,
seo=seo,
lead=lead,
layout=inp.get("layout", "article"),
title=inp.get("title", ""),
created_by=author_id,
@@ -380,7 +388,7 @@ def patch_topics(session, shout, topics_input):
# @login_required
async def update_shout(_, info, shout_id: int, shout_input=None, publish=False):
logger.info(f"Starting update_shout with id={shout_id}, publish={publish}")
logger.debug(f"Full shout_input: {shout_input}")
logger.debug(f"Full shout_input: {shout_input}") # DraftInput
user_id = info.context.get("user_id")
roles = info.context.get("roles", [])

View File

@@ -67,30 +67,35 @@ def add_reaction_stat_columns(q):
return q
def get_reactions_with_stat(q, limit, offset):
def get_reactions_with_stat(q, limit=10, offset=0):
"""
Execute the reaction query and retrieve reactions with statistics.
:param q: Query with reactions and statistics.
:param limit: Number of reactions to load.
:param offset: Pagination offset.
:return: List of reactions.
:return: List of reactions as dictionaries.
>>> get_reactions_with_stat(q, 10, 0) # doctest: +SKIP
[{'id': 1, 'body': 'Текст комментария', 'stat': {'rating': 5, 'comments_count': 3}, ...}]
"""
q = q.limit(limit).offset(offset)
reactions = []
with local_session() as session:
result_rows = session.execute(q)
for reaction, author, shout, commented_stat, rating_stat in result_rows:
for reaction, author, shout, comments_count, rating_stat in result_rows:
# Пропускаем реакции с отсутствующими shout или author
if not shout or not author:
logger.error(f"Пропущена реакция из-за отсутствия shout или author: {reaction.dict()}")
continue
reaction.created_by = author.dict()
reaction.shout = shout.dict()
reaction.stat = {"rating": rating_stat, "comments": commented_stat}
reactions.append(reaction)
# Преобразуем Reaction в словарь для доступа по ключу
reaction_dict = reaction.dict()
reaction_dict["created_by"] = author.dict()
reaction_dict["shout"] = shout.dict()
reaction_dict["stat"] = {"rating": rating_stat, "comments_count": comments_count}
reactions.append(reaction_dict)
return reactions
@@ -393,7 +398,7 @@ async def update_reaction(_, info, reaction):
result = session.execute(reaction_query).unique().first()
if result:
r, author, _shout, commented_stat, rating_stat = result
r, author, _shout, comments_count, rating_stat = result
if not r or not author:
return {"error": "Invalid reaction ID or unauthorized"}
@@ -408,7 +413,7 @@ async def update_reaction(_, info, reaction):
session.commit()
r.stat = {
"commented": commented_stat,
"comments_count": comments_count,
"rating": rating_stat,
}
@@ -713,7 +718,7 @@ async def load_comments_branch(
async def load_replies_count(comments):
"""
Загружает количество ответов для списка комментариев и обновляет поле stat.commented.
Загружает количество ответов для списка комментариев и обновляет поле stat.comments_count.
:param comments: Список комментариев, для которых нужно загрузить количество ответов.
"""
@@ -748,7 +753,7 @@ async def load_replies_count(comments):
comment["stat"] = {}
# Обновляем счетчик комментариев в stat
comment["stat"]["commented"] = replies_count.get(comment["id"], 0)
comment["stat"]["comments_count"] = replies_count.get(comment["id"], 0)
async def load_first_replies(comments, limit, offset, sort="newest"):
@@ -793,8 +798,9 @@ async def load_first_replies(comments, limit, offset, sort="newest"):
q = q.order_by(order_by_stmt, Reaction.reply_to)
# Выполняем запрос
replies = get_reactions_with_stat(q)
# Выполняем запрос - указываем limit для неограниченного количества ответов
# но не более 100 на родительский комментарий
replies = get_reactions_with_stat(q, limit=100, offset=0)
# Группируем ответы по родительским ID
replies_by_parent = {}

View File

@@ -10,7 +10,7 @@ from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import json_array_builder, json_builder, local_session
from services.schema import query
from services.search import search_text
from services.search import search_text, get_search_count
from services.viewed import ViewedStorage
from utils.logger import root_logger as logger
@@ -187,12 +187,10 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
"""
shouts = []
try:
# logger.info(f"Starting get_shouts_with_links with limit={limit}, offset={offset}")
q = q.limit(limit).offset(offset)
with local_session() as session:
shouts_result = session.execute(q).all()
# logger.info(f"Got {len(shouts_result) if shouts_result else 0} shouts from query")
if not shouts_result:
logger.warning("No shouts found in query result")
@@ -203,7 +201,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
shout = None
if hasattr(row, "Shout"):
shout = row.Shout
# logger.debug(f"Processing shout#{shout.id} at index {idx}")
if shout:
shout_id = int(f"{shout.id}")
shout_dict = shout.dict()
@@ -225,26 +222,22 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
elif isinstance(row.stat, dict):
stat = row.stat
viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0
shout_dict["stat"] = {**stat, "viewed": viewed, "commented": stat.get("comments_count", 0)}
shout_dict["stat"] = {**stat, "viewed": viewed}
# Обработка main_topic и topics
topics = None
if has_field(info, "topics") and hasattr(row, "topics"):
topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
# logger.debug(f"Shout#{shout_id} topics: {topics}")
shout_dict["topics"] = topics
if has_field(info, "main_topic"):
main_topic = None
if hasattr(row, "main_topic"):
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
main_topic = (
orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
)
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
if not main_topic and topics and len(topics) > 0:
# logger.info(f"No main_topic found for shout#{shout_id}, using first topic from list")
main_topic = {
"id": topics[0]["id"],
"title": topics[0]["title"],
@@ -252,10 +245,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
"is_main": True,
}
elif not main_topic:
logger.warning(f"No main_topic and no topics found for shout#{shout_id}")
main_topic = {"id": 0, "title": "no topic", "slug": "notopic", "is_main": True}
shout_dict["main_topic"] = main_topic
# logger.debug(f"Final main_topic for shout#{shout_id}: {main_topic}")
if has_field(info, "authors") and hasattr(row, "authors"):
shout_dict["authors"] = (
@@ -282,7 +273,6 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
logger.error(f"Fatal error in get_shouts_with_links: {e}", exc_info=True)
raise
finally:
logger.info(f"Returning {len(shouts)} shouts from get_shouts_with_links")
return shouts
@@ -401,8 +391,17 @@ async def load_shouts_search(_, info, text, options):
"""
limit = options.get("limit", 10)
offset = options.get("offset", 0)
if isinstance(text, str) and len(text) > 2:
# Get search results with pagination
results = await search_text(text, limit, offset)
# If no results, return empty list
if not results:
logger.info(f"No search results found for '{text}'")
return []
# Extract IDs and scores
scores = {}
hits_ids = []
for sr in results:
@@ -412,22 +411,42 @@ async def load_shouts_search(_, info, text, options):
scores[shout_id] = sr.get("score")
hits_ids.append(shout_id)
q = (
query_with_stat(info)
if has_field(info, "stat")
else select(Shout).filter(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
)
# Query DB for only the IDs in the current page
q = query_with_stat(info)
q = q.filter(Shout.id.in_(hits_ids))
q = apply_filters(q, options)
q = apply_sorting(q, options)
shouts = get_shouts_with_links(info, q, limit, offset)
q = apply_filters(q, options.get("filters", {}))
#
shouts = get_shouts_with_links(info, q, len(hits_ids), 0)
# Add scores from search results
for shout in shouts:
shout.score = scores[f"{shout.id}"]
shouts.sort(key=lambda x: x.score, reverse=True)
shout_id = str(shout['id'])
shout["score"] = scores.get(shout_id, 0)
# Re-sort by search score to maintain ranking
shouts.sort(key=lambda x: scores.get(str(x['id']), 0), reverse=True)
return shouts
return []
@query.field("get_search_results_count")
async def get_search_results_count(_, info, text):
"""
Returns the total count of search results for a search query.
:param _: Root query object (unused)
:param info: GraphQL context information
:param text: Search query text
:return: Total count of results
"""
if isinstance(text, str) and len(text) > 2:
count = await get_search_count(text)
return {"count": count}
return {"count": 0}
@query.field("load_shouts_unrated")
async def load_shouts_unrated(_, info, options):
"""

View File

@@ -127,6 +127,28 @@ async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None)
"""
followers_stats = {row[0]: row[1] for row in session.execute(text(followers_stats_query))}
# Запрос на получение статистики авторов для выбранных тем
authors_stats_query = f"""
SELECT st.topic, COUNT(DISTINCT sa.author) as authors_count
FROM shout_topic st
JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL
JOIN shout_author sa ON sa.shout = s.id
WHERE st.topic IN ({",".join(map(str, topic_ids))})
GROUP BY st.topic
"""
authors_stats = {row[0]: row[1] for row in session.execute(text(authors_stats_query))}
# Запрос на получение статистики комментариев для выбранных тем
comments_stats_query = f"""
SELECT st.topic, COUNT(DISTINCT r.id) as comments_count
FROM shout_topic st
JOIN shout s ON st.shout = s.id AND s.deleted_at IS NULL AND s.published_at IS NOT NULL
JOIN reaction r ON r.shout = s.id
WHERE st.topic IN ({",".join(map(str, topic_ids))})
GROUP BY st.topic
"""
comments_stats = {row[0]: row[1] for row in session.execute(text(comments_stats_query))}
# Формируем результат с добавлением статистики
result = []
for topic in topics:
@@ -134,6 +156,8 @@ async def get_topics_with_stats(limit=100, offset=0, community_id=None, by=None)
topic_dict["stat"] = {
"shouts": shouts_stats.get(topic.id, 0),
"followers": followers_stats.get(topic.id, 0),
"authors": authors_stats.get(topic.id, 0),
"comments": comments_stats.get(topic.id, 0),
}
result.append(topic_dict)
@@ -202,23 +226,6 @@ async def get_topics_all(_, _info):
return await get_all_topics()
# Запрос на получение тем с пагинацией и статистикой
@query.field("get_topics_paginated")
async def get_topics_paginated(_, _info, limit=100, offset=0, by=None):
"""
Получает список тем с пагинацией и статистикой.
Args:
limit: Максимальное количество возвращаемых тем
offset: Смещение для пагинации
by: Опциональные параметры сортировки
Returns:
list: Список тем с их статистикой
"""
return await get_topics_with_stats(limit, offset, None, by)
# Запрос на получение тем по сообществу
@query.field("get_topics_by_community")
async def get_topics_by_community(_, _info, community_id: int, limit=100, offset=0, by=None):

View File

@@ -33,7 +33,6 @@ input DraftInput {
main_topic_id: Int # Changed from main_topic: Topic
media: [MediaItemInput] # Changed to use MediaItemInput
lead: String
description: String
subtitle: String
lang: String
seo: String

View File

@@ -33,6 +33,7 @@ type Query {
get_shout(slug: String, shout_id: Int): Shout
load_shouts_by(options: LoadShoutsOptions): [Shout]
load_shouts_search(text: String!, options: LoadShoutsOptions): [SearchResult]
get_search_results_count(text: String!): CountResult!
load_shouts_bookmarked(options: LoadShoutsOptions): [Shout]
# rating
@@ -60,7 +61,7 @@ type Query {
get_topic(slug: String!): Topic
get_topics_all: [Topic]
get_topics_by_author(slug: String, user: String, author_id: Int): [Topic]
get_topics_by_community(slug: String, community_id: Int): [Topic]
get_topics_by_community(community_id: Int!, limit: Int, offset: Int): [Topic]
# notifier
load_notifications(after: Int!, limit: Int, offset: Int): NotificationsResult!

View File

@@ -80,7 +80,6 @@ type Shout {
layout: String!
lead: String
description: String
subtitle: String
lang: String
cover: String
@@ -100,6 +99,7 @@ type Shout {
featured_at: Int
deleted_at: Int
seo: String # generated if not set
version_of: Shout # TODO: use version_of somewhere
draft: Draft
media: [MediaItem]
@@ -111,13 +111,12 @@ type Draft {
id: Int!
created_at: Int!
created_by: Author!
community: Community!
layout: String
slug: String
title: String
subtitle: String
lead: String
description: String
body: String
media: [MediaItem]
cover: String
@@ -137,7 +136,7 @@ type Draft {
type Stat {
rating: Int
commented: Int
comments_count: Int
viewed: Int
last_commented_at: Int
}
@@ -208,6 +207,7 @@ type CommonResult {
}
type SearchResult {
id: Int!
slug: String!
title: String!
cover: String
@@ -275,3 +275,7 @@ type MyRateComment {
my_rate: ReactionKind
}
type CountResult {
count: Int!
}

34
server.py Normal file
View File

@@ -0,0 +1,34 @@
import sys
from pathlib import Path
from granian.constants import Interfaces
from granian.log import LogLevels
from granian.server import Server
from settings import PORT
from utils.logger import root_logger as logger
if __name__ == "__main__":
logger.info("started")
try:
granian_instance = Server(
"main:app",
address="0.0.0.0",
port=PORT,
interface=Interfaces.ASGI,
workers=1,
websockets=False,
log_level=LogLevels.debug,
backlog=2048,
)
if "dev" in sys.argv:
logger.info("dev mode, building ssl context")
granian_instance.build_ssl_context(cert=Path("localhost.pem"), key=Path("localhost-key.pem"), password=None)
granian_instance.serve()
except Exception as error:
logger.error(error, exc_info=True)
raise
finally:
logger.info("stopped")

View File

@@ -259,3 +259,27 @@ def get_json_builder():
# Используем их в коде
json_builder, json_array_builder, json_cast = get_json_builder()
async def fetch_all_shouts(session=None):
"""Fetch all published shouts for search indexing"""
from orm.shout import Shout
close_session = False
if session is None:
session = local_session()
close_session = True
try:
# Fetch only published and non-deleted shouts
query = session.query(Shout).filter(
Shout.published_at.is_not(None),
Shout.deleted_at.is_(None)
)
shouts = query.all()
return shouts
except Exception as e:
logger.error(f"Error fetching shouts for search indexing: {e}")
return []
finally:
if close_session:
session.close()

View File

@@ -2,231 +2,692 @@ import asyncio
import json
import logging
import os
import httpx
import time
from collections import defaultdict
from datetime import datetime, timedelta
import orjson
from opensearchpy import OpenSearch
from services.redis import redis
from utils.encoders import CustomJSONEncoder
# Set redis logging level to suppress DEBUG messages
# Set up proper logging
logger = logging.getLogger("search")
logger.setLevel(logging.WARNING)
logger.setLevel(logging.INFO) # Change to INFO to see more details
ELASTIC_HOST = os.environ.get("ELASTIC_HOST", "").replace("https://", "")
ELASTIC_USER = os.environ.get("ELASTIC_USER", "")
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD", "")
ELASTIC_PORT = os.environ.get("ELASTIC_PORT", 9200)
ELASTIC_URL = os.environ.get(
"ELASTIC_URL",
f"https://{ELASTIC_USER}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}",
)
REDIS_TTL = 86400 # 1 день в секундах
# Configuration for search service
SEARCH_ENABLED = bool(os.environ.get("SEARCH_ENABLED", "true").lower() in ["true", "1", "yes"])
TXTAI_SERVICE_URL = os.environ.get("TXTAI_SERVICE_URL", "none")
MAX_BATCH_SIZE = int(os.environ.get("SEARCH_MAX_BATCH_SIZE", "25"))
index_settings = {
"settings": {
"index": {"number_of_shards": 1, "auto_expand_replicas": "0-all"},
"analysis": {
"analyzer": {
"ru": {
"tokenizer": "standard",
"filter": ["lowercase", "ru_stop", "ru_stemmer"],
}
},
"filter": {
"ru_stemmer": {"type": "stemmer", "language": "russian"},
"ru_stop": {"type": "stop", "stopwords": "_russian_"},
},
},
},
"mappings": {
"properties": {
"body": {"type": "text", "analyzer": "ru"},
"title": {"type": "text", "analyzer": "ru"},
"subtitle": {"type": "text", "analyzer": "ru"},
"lead": {"type": "text", "analyzer": "ru"},
"media": {"type": "text", "analyzer": "ru"},
}
},
}
# Search cache configuration
SEARCH_CACHE_ENABLED = bool(os.environ.get("SEARCH_CACHE_ENABLED", "true").lower() in ["true", "1", "yes"])
SEARCH_CACHE_TTL_SECONDS = int(os.environ.get("SEARCH_CACHE_TTL_SECONDS", "900")) # Default: 15 minutes
SEARCH_MIN_SCORE = float(os.environ.get("SEARCH_MIN_SCORE", "0.1"))
SEARCH_PREFETCH_SIZE = int(os.environ.get("SEARCH_PREFETCH_SIZE", "200"))
SEARCH_USE_REDIS = bool(os.environ.get("SEARCH_USE_REDIS", "true").lower() in ["true", "1", "yes"])
expected_mapping = index_settings["mappings"]
search_offset = 0
# Создание цикла событий
search_loop = asyncio.get_event_loop()
# Import Redis client if Redis caching is enabled
if SEARCH_USE_REDIS:
try:
from services.redis import redis
logger.info("Redis client imported for search caching")
except ImportError:
logger.warning("Redis client import failed, falling back to memory cache")
SEARCH_USE_REDIS = False
# В начале файла добавим флаг
SEARCH_ENABLED = bool(os.environ.get("ELASTIC_HOST", ""))
def get_indices_stats():
indices_stats = search_service.client.cat.indices(format="json")
for index_info in indices_stats:
index_name = index_info["index"]
if not index_name.startswith("."):
index_health = index_info["health"]
index_status = index_info["status"]
pri_shards = index_info["pri"]
rep_shards = index_info["rep"]
docs_count = index_info["docs.count"]
docs_deleted = index_info["docs.deleted"]
store_size = index_info["store.size"]
pri_store_size = index_info["pri.store.size"]
logger.info(f"Index: {index_name}")
logger.info(f"Health: {index_health}")
logger.info(f"Status: {index_status}")
logger.info(f"Primary Shards: {pri_shards}")
logger.info(f"Replica Shards: {rep_shards}")
logger.info(f"Documents Count: {docs_count}")
logger.info(f"Deleted Documents: {docs_deleted}")
logger.info(f"Store Size: {store_size}")
logger.info(f"Primary Store Size: {pri_store_size}")
class SearchCache:
"""Cache for search results to enable efficient pagination"""
def __init__(self, ttl_seconds=SEARCH_CACHE_TTL_SECONDS, max_items=100):
self.cache = {} # Maps search query to list of results
self.last_accessed = {} # Maps search query to last access timestamp
self.ttl = ttl_seconds
self.max_items = max_items
self._redis_prefix = "search_cache:"
async def store(self, query, results):
"""Store search results for a query"""
normalized_query = self._normalize_query(query)
if SEARCH_USE_REDIS:
try:
serialized_results = json.dumps(results)
await redis.set(
f"{self._redis_prefix}{normalized_query}",
serialized_results,
ex=self.ttl
)
logger.info(f"Stored {len(results)} search results for query '{query}' in Redis")
return True
except Exception as e:
logger.error(f"Error storing search results in Redis: {e}")
# Fall back to memory cache if Redis fails
# First cleanup if needed for memory cache
if len(self.cache) >= self.max_items:
self._cleanup()
# Store results and update timestamp
self.cache[normalized_query] = results
self.last_accessed[normalized_query] = time.time()
logger.info(f"Cached {len(results)} search results for query '{query}' in memory")
return True
async def get(self, query, limit=10, offset=0):
"""Get paginated results for a query"""
normalized_query = self._normalize_query(query)
all_results = None
# Try to get from Redis first
if SEARCH_USE_REDIS:
try:
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
if cached_data:
all_results = json.loads(cached_data)
logger.info(f"Retrieved search results for '{query}' from Redis")
except Exception as e:
logger.error(f"Error retrieving search results from Redis: {e}")
# Fall back to memory cache if not in Redis
if all_results is None and normalized_query in self.cache:
all_results = self.cache[normalized_query]
self.last_accessed[normalized_query] = time.time()
logger.info(f"Retrieved search results for '{query}' from memory cache")
# If not found in any cache
if all_results is None:
logger.info(f"Cache miss for query '{query}'")
return None
# Return paginated subset
end_idx = min(offset + limit, len(all_results))
if offset >= len(all_results):
logger.warning(f"Requested offset {offset} exceeds result count {len(all_results)}")
return []
logger.info(f"Cache hit for '{query}': serving {offset}:{end_idx} of {len(all_results)} results")
return all_results[offset:end_idx]
async def has_query(self, query):
"""Check if query exists in cache"""
normalized_query = self._normalize_query(query)
# Check Redis first
if SEARCH_USE_REDIS:
try:
exists = await redis.get(f"{self._redis_prefix}{normalized_query}")
if exists:
return True
except Exception as e:
logger.error(f"Error checking Redis for query existence: {e}")
# Fall back to memory cache
return normalized_query in self.cache
async def get_total_count(self, query):
"""Get total count of results for a query"""
normalized_query = self._normalize_query(query)
# Check Redis first
if SEARCH_USE_REDIS:
try:
cached_data = await redis.get(f"{self._redis_prefix}{normalized_query}")
if cached_data:
all_results = json.loads(cached_data)
return len(all_results)
except Exception as e:
logger.error(f"Error getting result count from Redis: {e}")
# Fall back to memory cache
if normalized_query in self.cache:
return len(self.cache[normalized_query])
return 0
def _normalize_query(self, query):
"""Normalize query string for cache key"""
if not query:
return ""
# Simple normalization - lowercase and strip whitespace
return query.lower().strip()
def _cleanup(self):
"""Remove oldest entries if memory cache is full"""
now = time.time()
# First remove expired entries
expired_keys = [
key for key, last_access in self.last_accessed.items()
if now - last_access > self.ttl
]
for key in expired_keys:
if key in self.cache:
del self.cache[key]
if key in self.last_accessed:
del self.last_accessed[key]
logger.info(f"Cleaned up {len(expired_keys)} expired search cache entries")
# If still above max size, remove oldest entries
if len(self.cache) >= self.max_items:
# Sort by last access time
sorted_items = sorted(self.last_accessed.items(), key=lambda x: x[1])
# Remove oldest 20%
remove_count = max(1, int(len(sorted_items) * 0.2))
for key, _ in sorted_items[:remove_count]:
if key in self.cache:
del self.cache[key]
if key in self.last_accessed:
del self.last_accessed[key]
logger.info(f"Removed {remove_count} oldest search cache entries")
class SearchService:
def __init__(self, index_name="search_index"):
logger.info("Инициализируем поиск...")
self.index_name = index_name
self.client = None
self.lock = asyncio.Lock()
# Инициализация клиента OpenSearch только если поиск включен
if SEARCH_ENABLED:
try:
self.client = OpenSearch(
hosts=[{"host": ELASTIC_HOST, "port": ELASTIC_PORT}],
http_compress=True,
http_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
logger.info("Клиент OpenSearch.org подключен")
search_loop.create_task(self.check_index())
except Exception as exc:
logger.warning(f"Поиск отключен из-за ошибки подключения: {exc}")
self.client = None
else:
logger.info("Поиск отключен (ELASTIC_HOST не установлен)")
def __init__(self):
logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}")
self.available = SEARCH_ENABLED
# Use different timeout settings for indexing and search requests
self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL)
self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL)
# Initialize search cache
self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None
if not self.available:
logger.info("Search disabled (SEARCH_ENABLED = False)")
if SEARCH_CACHE_ENABLED:
cache_location = "Redis" if SEARCH_USE_REDIS else "Memory"
logger.info(f"Search caching enabled using {cache_location} cache with TTL={SEARCH_CACHE_TTL_SECONDS}s")
logger.info(f"Minimum score filter: {SEARCH_MIN_SCORE}, prefetch size: {SEARCH_PREFETCH_SIZE}")
async def info(self):
if not SEARCH_ENABLED:
"""Return information about search service"""
if not self.available:
return {"status": "disabled"}
try:
return get_indices_stats()
response = await self.client.get("/info")
response.raise_for_status()
result = response.json()
logger.info(f"Search service info: {result}")
return result
except Exception as e:
logger.error(f"Failed to get search info: {e}")
return {"status": "error", "message": str(e)}
def delete_index(self):
if self.client:
logger.warning(f"[!!!] Удаляем индекс {self.index_name}")
self.client.indices.delete(index=self.index_name, ignore_unavailable=True)
def create_index(self):
if self.client:
logger.info(f"Создается индекс: {self.index_name}")
self.client.indices.create(index=self.index_name, body=index_settings)
logger.info(f"Индекс {self.index_name} создан")
async def check_index(self):
if self.client:
logger.info(f"Проверяем индекс {self.index_name}...")
if not self.client.indices.exists(index=self.index_name):
self.create_index()
self.client.indices.put_mapping(index=self.index_name, body=expected_mapping)
else:
logger.info(f"Найден существующий индекс {self.index_name}")
# Проверка и обновление структуры индекса, если необходимо
result = self.client.indices.get_mapping(index=self.index_name)
if isinstance(result, str):
result = orjson.loads(result)
if isinstance(result, dict):
mapping = result.get(self.index_name, {}).get("mappings")
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}")
expected_keys = expected_mapping["properties"].keys()
if mapping and mapping["properties"].keys() != expected_keys:
logger.info(f"Ожидаемая структура индексации: {expected_mapping}")
logger.warning("[!!!] Требуется переиндексация всех данных")
self.delete_index()
self.client = None
else:
logger.error("клиент не инициализован, невозможно проверить индекс")
def is_ready(self):
"""Check if service is available"""
return self.available
async def verify_docs(self, doc_ids):
"""Verify which documents exist in the search index"""
if not self.available:
return {"status": "disabled"}
try:
logger.info(f"Verifying {len(doc_ids)} documents in search index")
response = await self.client.post(
"/verify-docs",
json={"doc_ids": doc_ids},
timeout=60.0 # Longer timeout for potentially large ID lists
)
response.raise_for_status()
result = response.json()
# Log summary of verification results
missing_count = len(result.get("missing", []))
logger.info(f"Document verification complete: {missing_count} missing out of {len(doc_ids)} total")
return result
except Exception as e:
logger.error(f"Document verification error: {e}")
return {"status": "error", "message": str(e)}
def index(self, shout):
if not SEARCH_ENABLED:
"""Index a single document"""
if not self.available:
return
logger.info(f"Indexing post {shout.id}")
# Start in background to not block
asyncio.create_task(self.perform_index(shout))
if self.client:
logger.info(f"Индексируем пост {shout.id}")
index_body = {
"body": shout.body,
"title": shout.title,
"subtitle": shout.subtitle,
"lead": shout.lead,
"media": shout.media,
}
asyncio.create_task(self.perform_index(shout, index_body))
async def perform_index(self, shout):
"""Actually perform the indexing operation"""
if not self.available:
return
try:
# Repeat title 3 times for higher keyword relevance
title_repeat = ". ".join(filter(None, [shout.title] * 3))
async def perform_index(self, shout, index_body):
if self.client:
# Combine all text fields
text_parts = [
title_repeat,
shout.subtitle or "",
shout.lead or "",
shout.body or "",
shout.media or ""
]
text = " ".join(filter(None, text_parts))
if not text.strip():
logger.warning(f"No text content to index for shout {shout.id}")
return
logger.info(f"Indexing document: ID={shout.id}, Text length={len(text)}")
# Send to txtai service
response = await self.client.post(
"/index",
json={"id": str(shout.id), "text": text}
)
response.raise_for_status()
result = response.json()
logger.info(f"Post {shout.id} successfully indexed: {result}")
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
async def bulk_index(self, shouts):
"""Index multiple documents at once with adaptive batch sizing"""
if not self.available or not shouts:
logger.warning(f"Bulk indexing skipped: available={self.available}, shouts_count={len(shouts) if shouts else 0}")
return
start_time = time.time()
logger.info(f"Starting bulk indexing of {len(shouts)} documents")
MAX_TEXT_LENGTH = 4000 # Maximum text length to send in a single request
max_batch_size = MAX_BATCH_SIZE
total_indexed = 0
total_skipped = 0
total_truncated = 0
total_retries = 0
# Group documents by size to process smaller documents in larger batches
small_docs = []
medium_docs = []
large_docs = []
# First pass: prepare all documents and categorize by size
for shout in shouts:
try:
await asyncio.wait_for(
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0
)
except asyncio.TimeoutError:
logger.error(f"Indexing timeout for shout {shout.id}")
text_fields = []
# Repeat title 3 times
title_repeat = ". ".join(filter(None, [getattr(shout, "title", None)] * 3))
text_fields = [title_repeat]
for field_name in ['subtitle', 'lead', 'body']:
field_value = getattr(shout, field_name, None)
if field_value and isinstance(field_value, str) and field_value.strip():
text_fields.append(field_value.strip())
# Media field processing remains the same
media = getattr(shout, 'media', None)
if media:
if isinstance(media, str):
try:
media_json = json.loads(media)
if isinstance(media_json, dict):
if 'title' in media_json:
text_fields.append(media_json['title'])
if 'body' in media_json:
text_fields.append(media_json['body'])
except json.JSONDecodeError:
text_fields.append(media)
elif isinstance(media, dict):
if 'title' in media:
text_fields.append(media['title'])
if 'body' in media:
text_fields.append(media['body'])
text = " ".join(text_fields)
if not text.strip():
total_skipped += 1
continue
# Truncate text if it exceeds the maximum length
original_length = len(text)
if original_length > MAX_TEXT_LENGTH:
text = text[:MAX_TEXT_LENGTH]
total_truncated += 1
document = {
"id": str(shout.id),
"text": text
}
# Categorize by size
text_len = len(text)
if text_len > 5000:
large_docs.append(document)
elif text_len > 2000:
medium_docs.append(document)
else:
small_docs.append(document)
total_indexed += 1
except Exception as e:
logger.error(f"Indexing error for shout {shout.id}: {e}")
logger.error(f"Error processing shout {getattr(shout, 'id', 'unknown')} for indexing: {e}")
total_skipped += 1
# Process each category with appropriate batch sizes
logger.info(f"Documents categorized: {len(small_docs)} small, {len(medium_docs)} medium, {len(large_docs)} large")
# Process small documents (larger batches)
if small_docs:
batch_size = min(max_batch_size, 15)
await self._process_document_batches(small_docs, batch_size, "small")
# Process medium documents (medium batches)
if medium_docs:
batch_size = min(max_batch_size, 10)
await self._process_document_batches(medium_docs, batch_size, "medium")
# Process large documents (small batches)
if large_docs:
batch_size = min(max_batch_size, 3)
await self._process_document_batches(large_docs, batch_size, "large")
elapsed = time.time() - start_time
logger.info(f"Bulk indexing completed in {elapsed:.2f}s: {total_indexed} indexed, {total_skipped} skipped, {total_truncated} truncated, {total_retries} retries")
async def _process_document_batches(self, documents, batch_size, size_category):
"""Process document batches with retry logic"""
# Check for possible database corruption before starting
db_error_count = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
batch_id = f"{size_category}-{i//batch_size + 1}"
logger.info(f"Processing {size_category} batch {batch_id} of {len(batch)} documents")
retry_count = 0
max_retries = 3
success = False
# Process with retries
while not success and retry_count < max_retries:
try:
logger.info(f"Sending batch {batch_id} of {len(batch)} documents to search service (attempt {retry_count+1})")
response = await self.index_client.post(
"/bulk-index",
json=batch,
timeout=120.0 # Explicit longer timeout for large batches
)
# Handle 422 validation errors - these won't be fixed by retrying
if response.status_code == 422:
error_detail = response.json()
truncated_error = self._truncate_error_detail(error_detail)
logger.error(f"Validation error from search service for batch {batch_id}: {truncated_error}")
break
# Handle 500 server errors - these might be fixed by retrying with smaller batches
elif response.status_code == 500:
db_error_count += 1
# If we've seen multiple 500s, log a critical error
if db_error_count >= 3:
logger.critical(f"Multiple server errors detected (500). The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
# Try again with exponential backoff
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5) # Exponential backoff with jitter
await asyncio.sleep(wait_time)
continue
# Final retry, split the batch
elif len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
else:
# Can't split a single document
break
# Normal success case
response.raise_for_status()
success = True
db_error_count = 0 # Reset error counter on success
except Exception as e:
error_str = str(e).lower()
if "duplicate key" in error_str or "unique constraint" in error_str or "nonetype" in error_str:
db_error_count += 1
if db_error_count >= 2:
logger.critical(f"Potential database corruption detected: {error_str}. The search service may need manual intervention. Stopping batch {batch_id} processing.")
break
if retry_count < max_retries - 1:
retry_count += 1
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time)
else:
if len(batch) > 1:
mid = len(batch) // 2
await self._process_single_batch(batch[:mid], f"{batch_id}-A")
await self._process_single_batch(batch[mid:], f"{batch_id}-B")
break
async def _process_single_batch(self, documents, batch_id):
"""Process a single batch with maximum reliability"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if not documents:
return
response = await self.index_client.post(
"/bulk-index",
json=documents,
timeout=90.0
)
response.raise_for_status()
return # Success, exit the retry loop
except Exception as e:
error_str = str(e).lower()
retry_count += 1
if "dictionary changed size" in error_str or "transaction error" in error_str:
wait_time = (2 ** retry_count) + (random.random() * 0.5)
await asyncio.sleep(wait_time) # Wait for txtai to recover
continue
if retry_count >= max_retries and len(documents) > 1:
for i, doc in enumerate(documents):
try:
resp = await self.index_client.post("/index", json=doc, timeout=30.0)
resp.raise_for_status()
except Exception as e2:
pass
return # Exit after individual processing attempt
def _truncate_error_detail(self, error_detail):
"""Truncate error details for logging"""
truncated_detail = error_detail.copy() if isinstance(error_detail, dict) else error_detail
if isinstance(truncated_detail, dict) and 'detail' in truncated_detail and isinstance(truncated_detail['detail'], list):
for i, item in enumerate(truncated_detail['detail']):
if isinstance(item, dict) and 'input' in item:
if isinstance(item['input'], dict) and any(k in item['input'] for k in ['documents', 'text']):
if 'documents' in item['input'] and isinstance(item['input']['documents'], list):
for j, doc in enumerate(item['input']['documents']):
if 'text' in doc and isinstance(doc['text'], str) and len(doc['text']) > 100:
item['input']['documents'][j]['text'] = f"{doc['text'][:100]}... [truncated, total {len(doc['text'])} chars]"
if 'text' in item['input'] and isinstance(item['input']['text'], str) and len(item['input']['text']) > 100:
item['input']['text'] = f"{item['input']['text'][:100]}... [truncated, total {len(item['input']['text'])} chars]"
return truncated_detail
async def search(self, text, limit, offset):
if not SEARCH_ENABLED:
"""Search documents"""
if not self.available:
return []
if not isinstance(text, str) or not text.strip():
return []
logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset})")
# Check if we can serve from cache
if SEARCH_CACHE_ENABLED:
has_cache = await self.cache.has_query(text)
if has_cache:
cached_results = await self.cache.get(text, limit, offset)
if cached_results is not None:
return cached_results
# Not in cache or cache disabled, perform new search
try:
search_limit = limit
search_offset = offset
logger.info(f"Ищем: {text} {offset}+{limit}")
search_body = {
"query": {"multi_match": {"query": text, "fields": ["title", "lead", "subtitle", "body", "media"]}}
}
if self.client:
search_response = self.client.search(
index=self.index_name,
body=search_body,
size=limit,
from_=offset,
_source=False,
_source_excludes=["title", "body", "subtitle", "media", "lead", "_index"],
if SEARCH_CACHE_ENABLED:
search_limit = SEARCH_PREFETCH_SIZE
search_offset = 0
else:
search_limit = limit
search_offset = offset
response = await self.client.post(
"/search",
json={"text": text, "limit": search_limit, "offset": search_offset}
)
hits = search_response["hits"]["hits"]
results = [{"id": hit["_id"], "score": hit["_score"]} for hit in hits]
# если результаты не пустые
if results:
# Кэширование в Redis с TTL
redis_key = f"search:{text}:{offset}+{limit}"
await redis.execute(
"SETEX",
redis_key,
REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder),
)
return results
return []
response.raise_for_status()
result = response.json()
formatted_results = result.get("results", [])
valid_results = []
for item in formatted_results:
doc_id = item.get("id")
if doc_id and doc_id.isdigit():
valid_results.append(item)
if len(valid_results) != len(formatted_results):
formatted_results = valid_results
if SEARCH_MIN_SCORE > 0:
initial_count = len(formatted_results)
formatted_results = [r for r in formatted_results if r.get("score", 0) >= SEARCH_MIN_SCORE]
if SEARCH_CACHE_ENABLED:
await self.cache.store(text, formatted_results)
end_idx = offset + limit
page_results = formatted_results[offset:end_idx]
return page_results
return formatted_results
except Exception as e:
logger.error(f"Search error for '{text}': {e}", exc_info=True)
return []
async def check_index_status(self):
"""Get detailed statistics about the search index health"""
if not self.available:
return {"status": "disabled"}
try:
response = await self.client.get("/index-status")
response.raise_for_status()
result = response.json()
if result.get("consistency", {}).get("status") != "ok":
null_count = result.get("consistency", {}).get("null_embeddings_count", 0)
if null_count > 0:
logger.warning(f"Found {null_count} documents with NULL embeddings")
return result
except Exception as e:
logger.error(f"Failed to check index status: {e}")
return {"status": "error", "message": str(e)}
# Create the search service singleton
search_service = SearchService()
# API-compatible function to perform a search
async def search_text(text: str, limit: int = 50, offset: int = 0):
payload = []
if search_service.client:
# Использование метода search_post из OpenSearchService
if search_service.available:
payload = await search_service.search(text, limit, offset)
return payload
async def get_search_count(text: str):
"""Get total count of results for a query without fetching all results"""
if search_service.available and SEARCH_CACHE_ENABLED:
if await search_service.cache.has_query(text):
return await search_service.cache.get_total_count(text)
results = await search_text(text, SEARCH_PREFETCH_SIZE, 0)
return len(results)
# Проверить что URL корректный
OPENSEARCH_URL = os.getenv("OPENSEARCH_URL", "rc1a-3n5pi3bhuj9gieel.mdb.yandexcloud.net")
async def initialize_search_index(shouts_data):
"""Initialize search index with existing data during application startup"""
if not SEARCH_ENABLED:
return
if not shouts_data:
return
info = await search_service.info()
if info.get("status") in ["error", "unavailable", "disabled"]:
return
index_stats = info.get("index_stats", {})
indexed_doc_count = index_stats.get("document_count", 0)
index_status = await search_service.check_index_status()
if index_status.get("status") == "inconsistent":
problem_ids = index_status.get("consistency", {}).get("null_embeddings_sample", [])
if problem_ids:
problem_docs = [shout for shout in shouts_data if str(shout.id) in problem_ids]
if problem_docs:
await search_service.bulk_index(problem_docs)
db_ids = [str(shout.id) for shout in shouts_data]
try:
numeric_ids = [int(sid) for sid in db_ids if sid.isdigit()]
if numeric_ids:
min_id = min(numeric_ids)
max_id = max(numeric_ids)
id_range = max_id - min_id + 1
except Exception as e:
pass
if abs(indexed_doc_count - len(shouts_data)) > 10:
doc_ids = [str(shout.id) for shout in shouts_data]
verification = await search_service.verify_docs(doc_ids)
if verification.get("status") == "error":
return
missing_ids = verification.get("missing", [])
if missing_ids:
missing_docs = [shout for shout in shouts_data if str(shout.id) in missing_ids]
await search_service.bulk_index(missing_docs)
else:
pass
try:
test_query = "test"
test_results = await search_text(test_query, 5)
if test_results:
categories = set()
for result in test_results:
result_id = result.get("id")
matching_shouts = [s for s in shouts_data if str(s.id) == result_id]
if matching_shouts and hasattr(matching_shouts[0], 'category'):
categories.add(getattr(matching_shouts[0], 'category', 'unknown'))
except Exception as e:
pass

View File

@@ -2,9 +2,7 @@ import asyncio
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Dict
import orjson
from typing import Dict, Optional
# ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient
@@ -20,33 +18,39 @@ from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import local_session
from services.redis import redis
from utils.logger import root_logger as logger
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json")
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "")
VIEWS_FILEPATH = "/dump/views.json"
class ViewedStorage:
"""
Класс для хранения и доступа к данным о просмотрах.
Использует Redis в качестве основного хранилища и Google Analytics для сбора новых данных.
"""
lock = asyncio.Lock()
precounted_by_slug = {}
views_by_shout = {}
shouts_by_topic = {}
shouts_by_author = {}
views = None
period = 60 * 60 # каждый час
analytics_client: BetaAnalyticsDataClient | None = None
analytics_client: Optional[BetaAnalyticsDataClient] = None
auth_result = None
running = False
redis_views_key = None
last_update_timestamp = 0
start_date = datetime.now().strftime("%Y-%m-%d")
@staticmethod
async def init():
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
"""Подключение к клиенту Google Analytics и загрузка данных о просмотрах из Redis"""
self = ViewedStorage
async with self.lock:
# Загрузка предварительно подсчитанных просмотров из файла JSON
self.load_precounted_views()
# Загрузка предварительно подсчитанных просмотров из Redis
await self.load_views_from_redis()
os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH)
if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
@@ -62,40 +66,54 @@ class ViewedStorage:
self.running = False
@staticmethod
def load_precounted_views():
"""Загрузка предварительно подсчитанных просмотров из файла JSON"""
async def load_views_from_redis():
"""Загрузка предварительно подсчитанных просмотров из Redis"""
self = ViewedStorage
viewfile_path = VIEWS_FILEPATH
if not os.path.exists(viewfile_path):
viewfile_path = os.path.join(os.path.curdir, "views.json")
if not os.path.exists(viewfile_path):
logger.warning(" * views.json not found")
return
logger.info(f" * loading views from {viewfile_path}")
try:
start_date_int = os.path.getmtime(viewfile_path)
start_date_str = datetime.fromtimestamp(start_date_int).strftime("%Y-%m-%d")
self.start_date = start_date_str
# Подключаемся к Redis если соединение не установлено
if not redis._client:
await redis.connect()
# Получаем список всех ключей migrated_views_* и находим самый последний
keys = await redis.execute("KEYS", "migrated_views_*")
if not keys:
logger.warning(" * No migrated_views keys found in Redis")
return
# Фильтруем только ключи timestamp формата (исключаем migrated_views_slugs)
timestamp_keys = [k for k in keys if k != "migrated_views_slugs"]
if not timestamp_keys:
logger.warning(" * No migrated_views timestamp keys found in Redis")
return
# Сортируем по времени создания (в названии ключа) и берем последний
timestamp_keys.sort()
latest_key = timestamp_keys[-1]
self.redis_views_key = latest_key
# Получаем метку времени создания для установки start_date
timestamp = await redis.execute("HGET", latest_key, "_timestamp")
if timestamp:
self.last_update_timestamp = int(timestamp)
timestamp_dt = datetime.fromtimestamp(int(timestamp))
self.start_date = timestamp_dt.strftime("%Y-%m-%d")
# Если данные сегодняшние, считаем их актуальными
now_date = datetime.now().strftime("%Y-%m-%d")
if now_date == self.start_date:
logger.info(" * views data is up to date!")
logger.info(" * Views data is up to date!")
else:
logger.warn(f" * {viewfile_path} is too old: {self.start_date}")
logger.warning(f" * Views data is from {self.start_date}, may need update")
with open(viewfile_path, "r") as file:
precounted_views = orjson.loads(file.read())
self.precounted_by_slug.update(precounted_views)
logger.info(f" * {len(precounted_views)} shouts with views was loaded.")
except Exception as e:
logger.error(f"precounted views loading error: {e}")
# Выводим информацию о количестве загруженных записей
total_entries = await redis.execute("HGET", latest_key, "_total")
if total_entries:
logger.info(f" * {total_entries} shouts with views loaded from Redis key: {latest_key}")
# noinspection PyTypeChecker
@staticmethod
async def update_pages():
"""Запрос всех страниц от Google Analytics, отсортрованных по количеству просмотров"""
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage
logger.info(" ⎧ views update from Google Analytics ---")
if self.running:
@@ -140,15 +158,40 @@ class ViewedStorage:
self.running = False
@staticmethod
def get_shout(shout_slug="", shout_id=0) -> int:
"""Получение метрики просмотров shout по slug или id."""
async def get_shout(shout_slug="", shout_id=0) -> int:
"""
Получение метрики просмотров shout по slug или id.
Args:
shout_slug: Slug публикации
shout_id: ID публикации
Returns:
int: Количество просмотров
"""
self = ViewedStorage
# Получаем данные из Redis для новой схемы хранения
if not redis._client:
await redis.connect()
fresh_views = self.views_by_shout.get(shout_slug, 0)
precounted_views = self.precounted_by_slug.get(shout_slug, 0)
return fresh_views + precounted_views
# Если есть id, пытаемся получить данные из Redis по ключу migrated_views_<timestamp>
if shout_id and self.redis_views_key:
precounted_views = await redis.execute("HGET", self.redis_views_key, str(shout_id))
if precounted_views:
return fresh_views + int(precounted_views)
# Если нет id или данных, пытаемся получить по slug из отдельного хеша
precounted_views = await redis.execute("HGET", "migrated_views_slugs", shout_slug)
if precounted_views:
return fresh_views + int(precounted_views)
return fresh_views
@staticmethod
def get_shout_media(shout_slug) -> Dict[str, int]:
async def get_shout_media(shout_slug) -> Dict[str, int]:
"""Получение метрики воспроизведения shout по slug."""
self = ViewedStorage
@@ -157,23 +200,29 @@ class ViewedStorage:
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
def get_topic(topic_slug) -> int:
async def get_topic(topic_slug) -> int:
"""Получение суммарного значения просмотров темы."""
self = ViewedStorage
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_topic.get(topic_slug, []))
views_count = 0
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod
def get_author(author_slug) -> int:
async def get_author(author_slug) -> int:
"""Получение суммарного значения просмотров автора."""
self = ViewedStorage
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_author.get(author_slug, []))
views_count = 0
for shout_slug in self.shouts_by_author.get(author_slug, []):
views_count += await self.get_shout(shout_slug=shout_slug)
return views_count
@staticmethod
def update_topics(shout_slug):
"""Обновление счетчиков темы по slug shout"""
self = ViewedStorage
with local_session() as session:
# Определение вспомогательной функции для избежа<EFBFBD><EFBFBD>ия повторения кода
# Определение вспомогательной функции для избежания повторения кода
def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value]))