This commit is contained in:
parent
09a6d085fd
commit
11611fd577
2
cache/revalidator.py
vendored
2
cache/revalidator.py
vendored
|
@ -52,7 +52,7 @@ class CacheRevalidationManager:
|
|||
async def stop(self):
|
||||
"""Остановка фонового воркера."""
|
||||
self.running = False
|
||||
if hasattr(self, 'task'):
|
||||
if hasattr(self, "task"):
|
||||
self.task.cancel()
|
||||
try:
|
||||
await self.task
|
||||
|
|
6
main.py
6
main.py
|
@ -86,11 +86,7 @@ async def lifespan(app):
|
|||
)
|
||||
yield
|
||||
finally:
|
||||
tasks = [
|
||||
redis.disconnect(),
|
||||
ViewedStorage.stop(),
|
||||
revalidation_manager.stop()
|
||||
]
|
||||
tasks = [redis.disconnect(), ViewedStorage.stop(), revalidation_manager.stop()]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
|
||||
|
|
|
@ -84,9 +84,10 @@ async def follow(_, info, what, slug):
|
|||
logger.debug("Обновление кэша")
|
||||
await cache_method(entity_dict)
|
||||
if get_cached_follows_method:
|
||||
logger.debug("Получение обновленных подписок из кэша")
|
||||
follows = await get_cached_follows_method(follower_id)
|
||||
# logger.debug(f"Текущие подписки: {follows}")
|
||||
logger.debug("Получение подписок из кэша")
|
||||
existing_follows = await get_cached_follows_method(follower_id)
|
||||
follows = [*existing_follows, entity_dict]
|
||||
logger.debug("Обновлен список подписок")
|
||||
|
||||
# Уведомление автора (только для типа AUTHOR)
|
||||
if what == "AUTHOR":
|
||||
|
@ -97,7 +98,7 @@ async def follow(_, info, what, slug):
|
|||
logger.exception("Произошла ошибка в функции 'follow'")
|
||||
return {"error": str(exc)}
|
||||
|
||||
logger.debug(f"Функция 'follow' завершена успешно с результатом: {what.lower()}s={follows}")
|
||||
# logger.debug(f"Функция 'follow' завершена успешно с результатом: {what.lower()}s={follows}")
|
||||
return {f"{what.lower()}s": follows}
|
||||
|
||||
|
||||
|
@ -120,11 +121,7 @@ async def unfollow(_, info, what, slug):
|
|||
"AUTHOR": (Author, AuthorFollower, get_cached_follower_authors, cache_author),
|
||||
"TOPIC": (Topic, TopicFollower, get_cached_follower_topics, cache_topic),
|
||||
"COMMUNITY": (Community, CommunityFollower, None, None), # Нет методов кэша для сообщества
|
||||
"SHOUT": (
|
||||
Shout,
|
||||
ShoutReactionsFollower,
|
||||
None,
|
||||
), # Нет методов кэша для shout
|
||||
"SHOUT": (Shout, ShoutReactionsFollower, None, None), # Нет методов кэша для shout
|
||||
}
|
||||
|
||||
if what not in entity_classes:
|
||||
|
@ -170,9 +167,10 @@ async def unfollow(_, info, what, slug):
|
|||
logger.debug("Обновление кэша после отписки")
|
||||
await cache_method(entity.dict())
|
||||
if get_cached_follows_method:
|
||||
logger.debug("Получение обновленных подписок из кэша")
|
||||
follows = await get_cached_follows_method(follower_id)
|
||||
# logger.debug(f"Текущие подписки: {follows}")
|
||||
logger.debug("Получение подписок из кэша")
|
||||
existing_follows = await get_cached_follows_method(follower_id)
|
||||
follows = filter(lambda x: x.id != entity_id, existing_follows)
|
||||
logger.debug("Обновлен список подписок")
|
||||
|
||||
if what == "AUTHOR":
|
||||
logger.debug("Отправка уведомления автору об отписке")
|
||||
|
@ -181,10 +179,11 @@ async def unfollow(_, info, what, slug):
|
|||
except Exception as exc:
|
||||
logger.exception("Произошла ошибка в функции 'unfollow'")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return {"error": str(exc)}
|
||||
|
||||
logger.debug(f"Функция 'unfollow' завершена успешно с результатом: {entity_type}s={follows}, error={error}")
|
||||
# logger.debug(f"Функция 'unfollow' завершена успешно с результатом: {entity_type}s={follows}, error={error}")
|
||||
return {f"{entity_type}s": follows, "error": error}
|
||||
|
||||
|
||||
|
@ -211,9 +210,10 @@ def get_shout_followers(_, _info, slug: str = "", shout_id: int | None = None) -
|
|||
|
||||
except Exception as _exc:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
logger.exception("Произошла ошибка в функции 'get_shout_followers'")
|
||||
return []
|
||||
|
||||
logger.debug(f"Функция 'get_shout_followers' завершена с {len(followers)} подписчиками")
|
||||
# logger.debug(f"Функция 'get_shout_followers' завершена с {len(followers)} подписчиками")
|
||||
return followers
|
||||
|
|
|
@ -288,15 +288,15 @@ async def create_reaction(_, info, reaction):
|
|||
try:
|
||||
with local_session() as session:
|
||||
shout = session.query(Shout).filter(Shout.id == shout_id).first()
|
||||
|
||||
|
||||
logger.debug(f"Loaded shout: {shout and shout.id}")
|
||||
|
||||
|
||||
if shout:
|
||||
reaction["created_by"] = author_id
|
||||
kind = reaction.get(
|
||||
"kind", ReactionKind.COMMENT.value if isinstance(reaction.get("body"), str) else None
|
||||
)
|
||||
|
||||
|
||||
logger.debug(f"Reaction kind: {kind}")
|
||||
|
||||
if kind in RATING_REACTIONS:
|
||||
|
@ -306,7 +306,7 @@ async def create_reaction(_, info, reaction):
|
|||
return error_result
|
||||
|
||||
rdict = await _create_reaction(session, info, shout, author_id, reaction)
|
||||
|
||||
|
||||
logger.debug(f"Created reaction result: {rdict}")
|
||||
|
||||
rdict["created_by"] = author_dict
|
||||
|
|
|
@ -70,11 +70,16 @@ def query_with_stat(info):
|
|||
q = q.join(main_author, main_author.id == Shout.created_by)
|
||||
q = q.add_columns(
|
||||
json_builder(
|
||||
"id", main_author.id,
|
||||
"name", main_author.name,
|
||||
"slug", main_author.slug,
|
||||
"pic", main_author.pic,
|
||||
"created_at", main_author.created_at
|
||||
"id",
|
||||
main_author.id,
|
||||
"name",
|
||||
main_author.name,
|
||||
"slug",
|
||||
main_author.slug,
|
||||
"pic",
|
||||
main_author.pic,
|
||||
"created_at",
|
||||
main_author.created_at,
|
||||
).label("main_author")
|
||||
)
|
||||
|
||||
|
@ -85,10 +90,7 @@ def query_with_stat(info):
|
|||
q = q.join(main_topic, main_topic.id == main_topic_join.topic)
|
||||
q = q.add_columns(
|
||||
json_builder(
|
||||
"id", main_topic.id,
|
||||
"title", main_topic.title,
|
||||
"slug", main_topic.slug,
|
||||
"is_main", main_topic_join.main
|
||||
"id", main_topic.id, "title", main_topic.title, "slug", main_topic.slug, "is_main", main_topic_join.main
|
||||
).label("main_topic")
|
||||
)
|
||||
|
||||
|
@ -97,17 +99,12 @@ def query_with_stat(info):
|
|||
select(
|
||||
ShoutTopic.shout,
|
||||
json_array_builder(
|
||||
json_builder(
|
||||
"id", Topic.id,
|
||||
"title", Topic.title,
|
||||
"slug", Topic.slug,
|
||||
"is_main", ShoutTopic.main
|
||||
)
|
||||
).label("topics")
|
||||
json_builder("id", Topic.id, "title", Topic.title, "slug", Topic.slug, "is_main", ShoutTopic.main)
|
||||
).label("topics"),
|
||||
)
|
||||
.outerjoin(Topic, ShoutTopic.topic == Topic.id)
|
||||
.where(ShoutTopic.shout == Shout.id)
|
||||
.group_by(ShoutTopic.shout)
|
||||
.group_by(ShoutTopic.shout)
|
||||
.subquery()
|
||||
)
|
||||
q = q.outerjoin(topics_subquery, topics_subquery.c.shout == Shout.id)
|
||||
|
@ -119,14 +116,20 @@ def query_with_stat(info):
|
|||
ShoutAuthor.shout,
|
||||
json_array_builder(
|
||||
json_builder(
|
||||
"id", Author.id,
|
||||
"name", Author.name,
|
||||
"slug", Author.slug,
|
||||
"pic", Author.pic,
|
||||
"caption", ShoutAuthor.caption,
|
||||
"created_at", Author.created_at
|
||||
"id",
|
||||
Author.id,
|
||||
"name",
|
||||
Author.name,
|
||||
"slug",
|
||||
Author.slug,
|
||||
"pic",
|
||||
Author.pic,
|
||||
"caption",
|
||||
ShoutAuthor.caption,
|
||||
"created_at",
|
||||
Author.created_at,
|
||||
)
|
||||
).label("authors")
|
||||
).label("authors"),
|
||||
)
|
||||
.outerjoin(Author, ShoutAuthor.author == Author.id)
|
||||
.where(ShoutAuthor.shout == Shout.id)
|
||||
|
@ -147,12 +150,12 @@ def query_with_stat(info):
|
|||
case(
|
||||
(Reaction.kind == ReactionKind.LIKE.value, 1),
|
||||
(Reaction.kind == ReactionKind.DISLIKE.value, -1),
|
||||
else_=0
|
||||
else_=0,
|
||||
)
|
||||
).filter(Reaction.reply_to.is_(None)).label("rating"),
|
||||
func.max(Reaction.created_at).filter(
|
||||
Reaction.reply_to.is_(None)
|
||||
).label("last_reacted_at")
|
||||
)
|
||||
.filter(Reaction.reply_to.is_(None))
|
||||
.label("rating"),
|
||||
func.max(Reaction.created_at).filter(Reaction.reply_to.is_(None)).label("last_reacted_at"),
|
||||
)
|
||||
.where(Reaction.deleted_at.is_(None))
|
||||
.group_by(Reaction.shout)
|
||||
|
@ -162,12 +165,15 @@ def query_with_stat(info):
|
|||
q = q.outerjoin(stats_subquery, stats_subquery.c.shout == Shout.id)
|
||||
q = q.add_columns(
|
||||
json_builder(
|
||||
"comments_count", stats_subquery.c.comments_count,
|
||||
"rating", stats_subquery.c.rating,
|
||||
"last_reacted_at", stats_subquery.c.last_reacted_at,
|
||||
"comments_count",
|
||||
stats_subquery.c.comments_count,
|
||||
"rating",
|
||||
stats_subquery.c.rating,
|
||||
"last_reacted_at",
|
||||
stats_subquery.c.last_reacted_at,
|
||||
).label("stat")
|
||||
)
|
||||
|
||||
|
||||
return q
|
||||
|
||||
|
||||
|
@ -223,16 +229,16 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
|
|||
elif isinstance(row.stat, dict):
|
||||
stat = row.stat
|
||||
viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0
|
||||
shout_dict["stat"] = {
|
||||
**stat,
|
||||
"viewed": viewed,
|
||||
"commented": stat.get("comments_count", 0)
|
||||
}
|
||||
shout_dict["stat"] = {**stat, "viewed": viewed, "commented": stat.get("comments_count", 0)}
|
||||
|
||||
if has_field(info, "main_topic") and hasattr(row, "main_topic"):
|
||||
shout_dict["main_topic"] = json.loads(row.main_topic) if isinstance(row.stat, str) else row.main_topic
|
||||
shout_dict["main_topic"] = (
|
||||
json.loads(row.main_topic) if isinstance(row.stat, str) else row.main_topic
|
||||
)
|
||||
if has_field(info, "authors") and hasattr(row, "authors"):
|
||||
shout_dict["authors"] = json.loads(row.authors) if isinstance(row.authors, str) else row.authors
|
||||
shout_dict["authors"] = (
|
||||
json.loads(row.authors) if isinstance(row.authors, str) else row.authors
|
||||
)
|
||||
if has_field(info, "topics") and hasattr(row, "topics"):
|
||||
shout_dict["topics"] = json.loads(row.topics) if isinstance(row.topics, str) else row.topics
|
||||
|
||||
|
@ -321,15 +327,11 @@ def apply_sorting(q, options):
|
|||
order_str = options.get("order_by")
|
||||
if order_str in ["rating", "comments_count", "last_reacted_at"]:
|
||||
query_order_by = desc(text(order_str)) if options.get("order_by_desc", True) else asc(text(order_str))
|
||||
q = (
|
||||
q.distinct(text(order_str), Shout.id) # DISTINCT ON включает поле сортировки
|
||||
.order_by(nulls_last(query_order_by), Shout.id)
|
||||
q = q.distinct(text(order_str), Shout.id).order_by( # DISTINCT ON включает поле сортировки
|
||||
nulls_last(query_order_by), Shout.id
|
||||
)
|
||||
else:
|
||||
q = (
|
||||
q.distinct(Shout.published_at, Shout.id)
|
||||
.order_by(Shout.published_at.desc(), Shout.id)
|
||||
)
|
||||
q = q.distinct(Shout.published_at, Shout.id).order_by(Shout.published_at.desc(), Shout.id)
|
||||
|
||||
return q
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ from granian.server import Granian
|
|||
from settings import PORT
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("started")
|
||||
|
||||
|
@ -20,7 +19,7 @@ if __name__ == "__main__":
|
|||
log_level=LogLevels.debug,
|
||||
backlog=2048,
|
||||
)
|
||||
|
||||
|
||||
granian_instance.serve()
|
||||
except Exception as error:
|
||||
logger.error(f"Granian error: {error}", exc_info=True)
|
||||
|
|
|
@ -25,8 +25,8 @@ if DB_URL.startswith("postgres"):
|
|||
pool_pre_ping=True, # Добавить проверку соединений
|
||||
connect_args={
|
||||
"sslmode": "disable",
|
||||
"connect_timeout": 40 # Добавить таймаут подключения
|
||||
}
|
||||
"connect_timeout": 40, # Добавить таймаут подключения
|
||||
},
|
||||
)
|
||||
else:
|
||||
engine = create_engine(DB_URL, echo=False, connect_args={"check_same_thread": False})
|
||||
|
|
|
@ -168,12 +168,7 @@ class SearchService:
|
|||
if self.client:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self.client.index(
|
||||
index=self.index_name,
|
||||
id=str(shout.id),
|
||||
body=index_body
|
||||
),
|
||||
timeout=40.0
|
||||
self.client.index(index=self.index_name, id=str(shout.id), body=index_body), timeout=40.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Indexing timeout for shout {shout.id}")
|
||||
|
|
Loading…
Reference in New Issue
Block a user