cachedep-fix+orjson+fmt
All checks were successful
Deploy on push / deploy (push) Successful in 1m16s

This commit is contained in:
Untone 2025-03-20 11:55:21 +03:00
parent a1781b3800
commit 247fc98760
14 changed files with 245 additions and 75 deletions

View File

@ -1,5 +1,7 @@
#### [0.4.12] - 2025-02-12 #### [0.4.12] - 2025-03-19
- `delete_reaction` detects comments and uses `deleted_at` update - `delete_reaction` detects comments and uses `deleted_at` update
- `check_to_unfeature` etc. update
- dogpile dep in `services/memorycache.py` optimized
#### [0.4.11] - 2025-02-12 #### [0.4.11] - 2025-02-12
- `create_draft` resolver requires draft_id fixed - `create_draft` resolver requires draft_id fixed

54
cache/cache.py vendored
View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import json
from typing import List from typing import List
import orjson
from sqlalchemy import and_, join, select from sqlalchemy import and_, join, select
from orm.author import Author, AuthorFollower from orm.author import Author, AuthorFollower
@ -35,7 +35,7 @@ CACHE_KEYS = {
# Cache topic data # Cache topic data
async def cache_topic(topic: dict): async def cache_topic(topic: dict):
payload = json.dumps(topic, cls=CustomJSONEncoder) payload = orjson.dumps(topic, cls=CustomJSONEncoder)
await asyncio.gather( await asyncio.gather(
redis_operation("SET", f"topic:id:{topic['id']}", payload), redis_operation("SET", f"topic:id:{topic['id']}", payload),
redis_operation("SET", f"topic:slug:{topic['slug']}", payload), redis_operation("SET", f"topic:slug:{topic['slug']}", payload),
@ -44,7 +44,7 @@ async def cache_topic(topic: dict):
# Cache author data # Cache author data
async def cache_author(author: dict): async def cache_author(author: dict):
payload = json.dumps(author, cls=CustomJSONEncoder) payload = orjson.dumps(author, cls=CustomJSONEncoder)
await asyncio.gather( await asyncio.gather(
redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])), redis_operation("SET", f"author:user:{author['user'].strip()}", str(author["id"])),
redis_operation("SET", f"author:id:{author['id']}", payload), redis_operation("SET", f"author:id:{author['id']}", payload),
@ -55,13 +55,13 @@ async def cache_author(author: dict):
async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True):
key = f"author:follows-{entity_type}s:{follower_id}" key = f"author:follows-{entity_type}s:{follower_id}"
follows_str = await redis_operation("GET", key) follows_str = await redis_operation("GET", key)
follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] follows = orjson.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type]
if is_insert: if is_insert:
if entity_id not in follows: if entity_id not in follows:
follows.append(entity_id) follows.append(entity_id)
else: else:
follows = [eid for eid in follows if eid != entity_id] follows = [eid for eid in follows if eid != entity_id]
await redis_operation("SET", key, json.dumps(follows, cls=CustomJSONEncoder)) await redis_operation("SET", key, orjson.dumps(follows, cls=CustomJSONEncoder))
await update_follower_stat(follower_id, entity_type, len(follows)) await update_follower_stat(follower_id, entity_type, len(follows))
@ -69,7 +69,7 @@ async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_i
async def update_follower_stat(follower_id, entity_type, count): async def update_follower_stat(follower_id, entity_type, count):
follower_key = f"author:id:{follower_id}" follower_key = f"author:id:{follower_id}"
follower_str = await redis_operation("GET", follower_key) follower_str = await redis_operation("GET", follower_key)
follower = json.loads(follower_str) if follower_str else None follower = orjson.loads(follower_str) if follower_str else None
if follower: if follower:
follower["stat"] = {f"{entity_type}s": count} follower["stat"] = {f"{entity_type}s": count}
await cache_author(follower) await cache_author(follower)
@ -80,7 +80,7 @@ async def get_cached_author(author_id: int, get_with_stat):
author_key = f"author:id:{author_id}" author_key = f"author:id:{author_id}"
result = await redis_operation("GET", author_key) result = await redis_operation("GET", author_key)
if result: if result:
return json.loads(result) return orjson.loads(result)
# Load from database if not found in cache # Load from database if not found in cache
q = select(Author).where(Author.id == author_id) q = select(Author).where(Author.id == author_id)
authors = get_with_stat(q) authors = get_with_stat(q)
@ -105,14 +105,14 @@ async def get_cached_topic(topic_id: int):
topic_key = f"topic:id:{topic_id}" topic_key = f"topic:id:{topic_id}"
cached_topic = await redis_operation("GET", topic_key) cached_topic = await redis_operation("GET", topic_key)
if cached_topic: if cached_topic:
return json.loads(cached_topic) return orjson.loads(cached_topic)
# If not in cache, fetch from the database # If not in cache, fetch from the database
with local_session() as session: with local_session() as session:
topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none()
if topic: if topic:
topic_dict = topic.dict() topic_dict = topic.dict()
await redis_operation("SET", topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) await redis_operation("SET", topic_key, orjson.dumps(topic_dict, cls=CustomJSONEncoder))
return topic_dict return topic_dict
return None return None
@ -123,7 +123,7 @@ async def get_cached_topic_by_slug(slug: str, get_with_stat):
topic_key = f"topic:slug:{slug}" topic_key = f"topic:slug:{slug}"
result = await redis_operation("GET", topic_key) result = await redis_operation("GET", topic_key)
if result: if result:
return json.loads(result) return orjson.loads(result)
# Load from database if not found in cache # Load from database if not found in cache
topic_query = select(Topic).where(Topic.slug == slug) topic_query = select(Topic).where(Topic.slug == slug)
topics = get_with_stat(topic_query) topics = get_with_stat(topic_query)
@ -139,7 +139,7 @@ async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]:
# Fetch all author data concurrently # Fetch all author data concurrently
keys = [f"author:id:{author_id}" for author_id in author_ids] keys = [f"author:id:{author_id}" for author_id in author_ids]
results = await asyncio.gather(*(redis_operation("GET", key) for key in keys)) results = await asyncio.gather(*(redis_operation("GET", key) for key in keys))
authors = [json.loads(result) if result else None for result in results] authors = [orjson.loads(result) if result else None for result in results]
# Load missing authors from database and cache # Load missing authors from database and cache
missing_indices = [index for index, author in enumerate(authors) if author is None] missing_indices = [index for index, author in enumerate(authors) if author is None]
if missing_indices: if missing_indices:
@ -168,7 +168,7 @@ async def get_cached_topic_followers(topic_id: int):
cached = await redis_operation("GET", cache_key) cached = await redis_operation("GET", cache_key)
if cached: if cached:
followers_ids = json.loads(cached) followers_ids = orjson.loads(cached)
logger.debug(f"Found {len(followers_ids)} cached followers for topic #{topic_id}") logger.debug(f"Found {len(followers_ids)} cached followers for topic #{topic_id}")
return await get_cached_authors_by_ids(followers_ids) return await get_cached_authors_by_ids(followers_ids)
@ -181,7 +181,7 @@ async def get_cached_topic_followers(topic_id: int):
.all() .all()
] ]
await redis_operation("SETEX", cache_key, value=json.dumps(followers_ids), ttl=CACHE_TTL) await redis_operation("SETEX", cache_key, value=orjson.dumps(followers_ids), ttl=CACHE_TTL)
followers = await get_cached_authors_by_ids(followers_ids) followers = await get_cached_authors_by_ids(followers_ids)
logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}") logger.debug(f"Cached {len(followers)} followers for topic #{topic_id}")
return followers return followers
@ -196,7 +196,7 @@ async def get_cached_author_followers(author_id: int):
# Check cache for data # Check cache for data
cached = await redis_operation("GET", f"author:followers:{author_id}") cached = await redis_operation("GET", f"author:followers:{author_id}")
if cached: if cached:
followers_ids = json.loads(cached) followers_ids = orjson.loads(cached)
followers = await get_cached_authors_by_ids(followers_ids) followers = await get_cached_authors_by_ids(followers_ids)
logger.debug(f"Cached followers for author #{author_id}: {len(followers)}") logger.debug(f"Cached followers for author #{author_id}: {len(followers)}")
return followers return followers
@ -210,7 +210,7 @@ async def get_cached_author_followers(author_id: int):
.filter(AuthorFollower.author == author_id, Author.id != author_id) .filter(AuthorFollower.author == author_id, Author.id != author_id)
.all() .all()
] ]
await redis_operation("SET", f"author:followers:{author_id}", json.dumps(followers_ids)) await redis_operation("SET", f"author:followers:{author_id}", orjson.dumps(followers_ids))
followers = await get_cached_authors_by_ids(followers_ids) followers = await get_cached_authors_by_ids(followers_ids)
return followers return followers
@ -220,7 +220,7 @@ async def get_cached_follower_authors(author_id: int):
# Attempt to retrieve authors from cache # Attempt to retrieve authors from cache
cached = await redis_operation("GET", f"author:follows-authors:{author_id}") cached = await redis_operation("GET", f"author:follows-authors:{author_id}")
if cached: if cached:
authors_ids = json.loads(cached) authors_ids = orjson.loads(cached)
else: else:
# Query authors from database # Query authors from database
with local_session() as session: with local_session() as session:
@ -232,7 +232,7 @@ async def get_cached_follower_authors(author_id: int):
.where(AuthorFollower.follower == author_id) .where(AuthorFollower.follower == author_id)
).all() ).all()
] ]
await redis_operation("SET", f"author:follows-authors:{author_id}", json.dumps(authors_ids)) await redis_operation("SET", f"author:follows-authors:{author_id}", orjson.dumps(authors_ids))
authors = await get_cached_authors_by_ids(authors_ids) authors = await get_cached_authors_by_ids(authors_ids)
return authors return authors
@ -243,7 +243,7 @@ async def get_cached_follower_topics(author_id: int):
# Attempt to retrieve topics from cache # Attempt to retrieve topics from cache
cached = await redis_operation("GET", f"author:follows-topics:{author_id}") cached = await redis_operation("GET", f"author:follows-topics:{author_id}")
if cached: if cached:
topics_ids = json.loads(cached) topics_ids = orjson.loads(cached)
else: else:
# Load topics from database and cache them # Load topics from database and cache them
with local_session() as session: with local_session() as session:
@ -254,13 +254,13 @@ async def get_cached_follower_topics(author_id: int):
.where(TopicFollower.follower == author_id) .where(TopicFollower.follower == author_id)
.all() .all()
] ]
await redis_operation("SET", f"author:follows-topics:{author_id}", json.dumps(topics_ids)) await redis_operation("SET", f"author:follows-topics:{author_id}", orjson.dumps(topics_ids))
topics = [] topics = []
for topic_id in topics_ids: for topic_id in topics_ids:
topic_str = await redis_operation("GET", f"topic:id:{topic_id}") topic_str = await redis_operation("GET", f"topic:id:{topic_id}")
if topic_str: if topic_str:
topic = json.loads(topic_str) topic = orjson.loads(topic_str)
if topic and topic not in topics: if topic and topic not in topics:
topics.append(topic) topics.append(topic)
@ -285,7 +285,7 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
# If ID is found, get full author data by ID # If ID is found, get full author data by ID
author_data = await redis_operation("GET", f"author:id:{author_id}") author_data = await redis_operation("GET", f"author:id:{author_id}")
if author_data: if author_data:
return json.loads(author_data) return orjson.loads(author_data)
# If data is not found in cache, query the database # If data is not found in cache, query the database
author_query = select(Author).where(Author.user == user_id) author_query = select(Author).where(Author.user == user_id)
@ -296,7 +296,7 @@ async def get_cached_author_by_user_id(user_id: str, get_with_stat):
author_dict = author.dict() author_dict = author.dict()
await asyncio.gather( await asyncio.gather(
redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)), redis_operation("SET", f"author:user:{user_id.strip()}", str(author.id)),
redis_operation("SET", f"author:id:{author.id}", json.dumps(author_dict)), redis_operation("SET", f"author:id:{author.id}", orjson.dumps(author_dict)),
) )
return author_dict return author_dict
@ -319,7 +319,7 @@ async def get_cached_topic_authors(topic_id: int):
rkey = f"topic:authors:{topic_id}" rkey = f"topic:authors:{topic_id}"
cached_authors_ids = await redis_operation("GET", rkey) cached_authors_ids = await redis_operation("GET", rkey)
if cached_authors_ids: if cached_authors_ids:
authors_ids = json.loads(cached_authors_ids) authors_ids = orjson.loads(cached_authors_ids)
else: else:
# If cache is empty, get data from the database # If cache is empty, get data from the database
with local_session() as session: with local_session() as session:
@ -331,7 +331,7 @@ async def get_cached_topic_authors(topic_id: int):
) )
authors_ids = [author_id for (author_id,) in session.execute(query).all()] authors_ids = [author_id for (author_id,) in session.execute(query).all()]
# Cache the retrieved author IDs # Cache the retrieved author IDs
await redis_operation("SET", rkey, json.dumps(authors_ids)) await redis_operation("SET", rkey, orjson.dumps(authors_ids))
# Retrieve full author details from cached IDs # Retrieve full author details from cached IDs
if authors_ids: if authors_ids:
@ -378,7 +378,7 @@ async def invalidate_shouts_cache(cache_keys: List[str]):
async def cache_topic_shouts(topic_id: int, shouts: List[dict]): async def cache_topic_shouts(topic_id: int, shouts: List[dict]):
"""Кэширует список публикаций для темы""" """Кэширует список публикаций для темы"""
key = f"topic_shouts_{topic_id}" key = f"topic_shouts_{topic_id}"
payload = json.dumps(shouts, cls=CustomJSONEncoder) payload = orjson.dumps(shouts, cls=CustomJSONEncoder)
await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL) await redis_operation("SETEX", key, value=payload, ttl=CACHE_TTL)
@ -387,7 +387,7 @@ async def get_cached_topic_shouts(topic_id: int) -> List[dict]:
key = f"topic_shouts_{topic_id}" key = f"topic_shouts_{topic_id}"
cached = await redis_operation("GET", key) cached = await redis_operation("GET", key)
if cached: if cached:
return json.loads(cached) return orjson.loads(cached)
return None return None
@ -467,7 +467,7 @@ async def get_cached_entity(entity_type: str, entity_id: int, get_method, cache_
key = f"{entity_type}:id:{entity_id}" key = f"{entity_type}:id:{entity_id}"
cached = await redis_operation("GET", key) cached = await redis_operation("GET", key)
if cached: if cached:
return json.loads(cached) return orjson.loads(cached)
entity = await get_method(entity_id) entity = await get_method(entity_id)
if entity: if entity:

176
cache/memorycache.py vendored
View File

@ -1,11 +1,169 @@
from dogpile.cache import make_region """
Модуль для кеширования данных с использованием Redis.
Предоставляет API, совместимый с dogpile.cache для поддержки обратной совместимости.
"""
from settings import REDIS_URL import functools
import hashlib
import inspect
import logging
import pickle
from typing import Callable, Optional
# Создание региона кэша с TTL import orjson
cache_region = make_region()
cache_region.configure( from services.redis import redis
"dogpile.cache.redis", from utils.encoders import CustomJSONEncoder
arguments={"url": f"{REDIS_URL}/1"},
expiration_time=3600, # Cache expiration time in seconds logger = logging.getLogger(__name__)
)
DEFAULT_TTL = 300 # время жизни кеша в секундах (5 минут)
class RedisCache:
"""
Класс, предоставляющий API, совместимый с dogpile.cache, но использующий Redis.
Примеры:
>>> cache_region = RedisCache()
>>> @cache_region.cache_on_arguments("my_key")
... def my_func(arg1, arg2):
... return arg1 + arg2
"""
def __init__(self, ttl: int = DEFAULT_TTL):
"""
Инициализация объекта кеша.
Args:
ttl: Время жизни кеша в секундах
"""
self.ttl = ttl
def cache_on_arguments(self, cache_key: Optional[str] = None) -> Callable:
"""
Декоратор для кеширования результатов функций с использованием Redis.
Args:
cache_key: Опциональный базовый ключ кеша. Если не указан, генерируется из сигнатуры функции.
Returns:
Декоратор для кеширования функции
Примеры:
>>> @cache_region.cache_on_arguments("users")
... def get_users():
... return db.query(User).all()
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Генерация ключа кеша
key = self._generate_cache_key(func, cache_key, *args, **kwargs)
# Попытка получить данные из кеша
cached_data = await redis.get(key)
if cached_data:
try:
return orjson.loads(cached_data)
except Exception:
# Если не удалось десериализовать как JSON, попробуем как pickle
return pickle.loads(cached_data.encode())
# Вызов оригинальной функции, если данных в кеше нет
result = func(*args, **kwargs)
# Сохранение результата в кеш
try:
# Пытаемся сериализовать как JSON
serialized = orjson.dumps(result, cls=CustomJSONEncoder)
except (TypeError, ValueError):
# Если не удалось, используем pickle
serialized = pickle.dumps(result).decode()
await redis.set(key, serialized, ex=self.ttl)
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Для функций, которые не являются корутинами
# Генерация ключа кеша
key = self._generate_cache_key(func, cache_key, *args, **kwargs)
# Синхронная версия не использует await, поэтому результат всегда вычисляется
result = func(*args, **kwargs)
# Асинхронно записываем в кэш (будет выполнено позже)
try:
import asyncio
serialized = orjson.dumps(result, cls=CustomJSONEncoder)
asyncio.create_task(redis.set(key, serialized, ex=self.ttl))
except Exception as e:
logger.error(f"Ошибка при кешировании результата: {e}")
return result
# Возвращаем асинхронный или синхронный враппер в зависимости от типа функции
if inspect.iscoroutinefunction(func):
return wrapper
else:
return sync_wrapper
return decorator
def _generate_cache_key(self, func: Callable, base_key: Optional[str], *args, **kwargs) -> str:
"""
Генерирует ключ кеша на основе функции и её аргументов.
Args:
func: Кешируемая функция
base_key: Базовый ключ кеша
*args: Позиционные аргументы функции
**kwargs: Именованные аргументы функции
Returns:
Строковый ключ для кеша
"""
if base_key:
key_prefix = f"cache:{base_key}"
else:
key_prefix = f"cache:{func.__module__}.{func.__name__}"
# Создаем хеш аргументов
arg_hash = hashlib.md5()
# Добавляем позиционные аргументы
for arg in args:
try:
arg_hash.update(str(arg).encode())
except Exception:
arg_hash.update(str(id(arg)).encode())
# Добавляем именованные аргументы (сортируем для детерминированности)
for k in sorted(kwargs.keys()):
try:
arg_hash.update(f"{k}:{kwargs[k]}".encode())
except Exception:
arg_hash.update(f"{k}:{id(kwargs[k])}".encode())
return f"{key_prefix}:{arg_hash.hexdigest()}"
def invalidate(self, func: Callable, *args, **kwargs) -> None:
"""
Инвалидирует (удаляет) кеш для конкретной функции с конкретными аргументами.
Args:
func: Кешированная функция
*args: Позиционные аргументы функции
**kwargs: Именованные аргументы функции
"""
key = self._generate_cache_key(func, None, *args, **kwargs)
import asyncio
asyncio.create_task(redis.execute("DEL", key))
# Экземпляр класса RedisCache для использования в коде
cache_region = RedisCache()

14
cache/precache.py vendored
View File

@ -1,6 +1,6 @@
import asyncio import asyncio
import json
import orjson
from sqlalchemy import and_, join, select from sqlalchemy import and_, join, select
from cache.cache import cache_author, cache_topic from cache.cache import cache_author, cache_topic
@ -21,7 +21,7 @@ async def precache_authors_followers(author_id, session):
result = session.execute(followers_query) result = session.execute(followers_query)
authors_followers.update(row[0] for row in result if row[0]) authors_followers.update(row[0] for row in result if row[0])
followers_payload = json.dumps(list(authors_followers), cls=CustomJSONEncoder) followers_payload = orjson.dumps(list(authors_followers), cls=CustomJSONEncoder)
await redis.execute("SET", f"author:followers:{author_id}", followers_payload) await redis.execute("SET", f"author:followers:{author_id}", followers_payload)
@ -35,9 +35,9 @@ async def precache_authors_follows(author_id, session):
follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]} follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]}
follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]} follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]}
topics_payload = json.dumps(list(follows_topics), cls=CustomJSONEncoder) topics_payload = orjson.dumps(list(follows_topics), cls=CustomJSONEncoder)
authors_payload = json.dumps(list(follows_authors), cls=CustomJSONEncoder) authors_payload = orjson.dumps(list(follows_authors), cls=CustomJSONEncoder)
shouts_payload = json.dumps(list(follows_shouts), cls=CustomJSONEncoder) shouts_payload = orjson.dumps(list(follows_shouts), cls=CustomJSONEncoder)
await asyncio.gather( await asyncio.gather(
redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload), redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload),
@ -62,7 +62,7 @@ async def precache_topics_authors(topic_id: int, session):
) )
topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]} topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]}
authors_payload = json.dumps(list(topic_authors), cls=CustomJSONEncoder) authors_payload = orjson.dumps(list(topic_authors), cls=CustomJSONEncoder)
await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload) await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload)
@ -71,7 +71,7 @@ async def precache_topics_followers(topic_id: int, session):
followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id) followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id)
topic_followers = {row[0] for row in session.execute(followers_query) if row[0]} topic_followers = {row[0] for row in session.execute(followers_query) if row[0]}
followers_payload = json.dumps(list(topic_followers), cls=CustomJSONEncoder) followers_payload = orjson.dumps(list(topic_followers), cls=CustomJSONEncoder)
await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload) await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload)

View File

@ -14,6 +14,16 @@
- Автоматическое определение сервера авторизации - Автоматическое определение сервера авторизации
- Корректная обработка CORS для всех поддерживаемых доменов - Корректная обработка CORS для всех поддерживаемых доменов
## Система кеширования
- Redis используется в качестве основного механизма кеширования
- Поддержка как синхронных, так и асинхронных функций в декораторе cache_on_arguments
- Автоматическая сериализация/десериализация данных в JSON с использованием CustomJSONEncoder
- Резервная сериализация через pickle для сложных объектов
- Генерация уникальных ключей кеша на основе сигнатуры функции и переданных аргументов
- Настраиваемое время жизни кеша (TTL)
- Возможность ручной инвалидации кеша для конкретных функций и аргументов
## Webhooks ## Webhooks
- Автоматическая регистрация вебхука для события user.login - Автоматическая регистрация вебхука для события user.login

View File

@ -2,7 +2,6 @@
bcrypt bcrypt
authlib authlib
passlib passlib
opensearch-py opensearch-py
google-analytics-data google-analytics-data
colorlog colorlog
@ -14,5 +13,5 @@ starlette
gql gql
ariadne ariadne
granian granian
orjson
pydantic pydantic

View File

@ -1,6 +1,6 @@
import json
import time import time
import orjson
from sqlalchemy import and_, desc, select from sqlalchemy import and_, desc, select
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from sqlalchemy.sql.functions import coalesce from sqlalchemy.sql.functions import coalesce
@ -106,7 +106,7 @@ async def get_my_shout(_, info, shout_id: int):
if hasattr(shout, "media") and shout.media: if hasattr(shout, "media") and shout.media:
if isinstance(shout.media, str): if isinstance(shout.media, str):
try: try:
shout.media = json.loads(shout.media) shout.media = orjson.loads(shout.media)
except Exception as e: except Exception as e:
logger.error(f"Error parsing shout media: {e}") logger.error(f"Error parsing shout media: {e}")
shout.media = [] shout.media = []

View File

@ -1,7 +1,7 @@
import json
import time import time
from typing import List, Tuple from typing import List, Tuple
import orjson
from sqlalchemy import and_, select from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
@ -115,7 +115,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o
if (groups_amount + offset) >= limit: if (groups_amount + offset) >= limit:
break break
payload = json.loads(str(notification.payload)) payload = orjson.loads(str(notification.payload))
if str(notification.entity) == NotificationEntity.SHOUT.value: if str(notification.entity) == NotificationEntity.SHOUT.value:
shout = payload shout = payload
@ -177,7 +177,7 @@ def get_notifications_grouped(author_id: int, after: int = 0, limit: int = 10, o
elif str(notification.entity) == "follower": elif str(notification.entity) == "follower":
thread_id = "followers" thread_id = "followers"
follower = json.loads(payload) follower = orjson.loads(payload)
group = groups_by_thread.get(thread_id) group = groups_by_thread.get(thread_id)
if group: if group:
if str(notification.action) == "follow": if str(notification.action) == "follow":
@ -293,11 +293,11 @@ async def notifications_seen_thread(_, info, thread: str, after: int):
) )
exclude = set() exclude = set()
for nr in removed_reaction_notifications: for nr in removed_reaction_notifications:
reaction = json.loads(str(nr.payload)) reaction = orjson.loads(str(nr.payload))
reaction_id = reaction.get("id") reaction_id = reaction.get("id")
exclude.add(reaction_id) exclude.add(reaction_id)
for n in new_reaction_notifications: for n in new_reaction_notifications:
reaction = json.loads(str(n.payload)) reaction = orjson.loads(str(n.payload))
reaction_id = reaction.get("id") reaction_id = reaction.get("id")
if ( if (
reaction_id not in exclude reaction_id not in exclude

View File

@ -1,5 +1,4 @@
import json import orjson
from graphql import GraphQLResolveInfo from graphql import GraphQLResolveInfo
from sqlalchemy import and_, nulls_last, text from sqlalchemy import and_, nulls_last, text
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
@ -222,7 +221,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
if has_field(info, "stat"): if has_field(info, "stat"):
stat = {} stat = {}
if isinstance(row.stat, str): if isinstance(row.stat, str):
stat = json.loads(row.stat) stat = orjson.loads(row.stat)
elif isinstance(row.stat, dict): elif isinstance(row.stat, dict):
stat = row.stat stat = row.stat
viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0 viewed = ViewedStorage.get_shout(shout_id=shout_id) or 0
@ -231,7 +230,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
# Обработка main_topic и topics # Обработка main_topic и topics
topics = None topics = None
if has_field(info, "topics") and hasattr(row, "topics"): if has_field(info, "topics") and hasattr(row, "topics"):
topics = json.loads(row.topics) if isinstance(row.topics, str) else row.topics topics = orjson.loads(row.topics) if isinstance(row.topics, str) else row.topics
# logger.debug(f"Shout#{shout_id} topics: {topics}") # logger.debug(f"Shout#{shout_id} topics: {topics}")
shout_dict["topics"] = topics shout_dict["topics"] = topics
@ -240,7 +239,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
if hasattr(row, "main_topic"): if hasattr(row, "main_topic"):
# logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}") # logger.debug(f"Raw main_topic for shout#{shout_id}: {row.main_topic}")
main_topic = ( main_topic = (
json.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic orjson.loads(row.main_topic) if isinstance(row.main_topic, str) else row.main_topic
) )
# logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}") # logger.debug(f"Parsed main_topic for shout#{shout_id}: {main_topic}")
@ -260,7 +259,7 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
if has_field(info, "authors") and hasattr(row, "authors"): if has_field(info, "authors") and hasattr(row, "authors"):
shout_dict["authors"] = ( shout_dict["authors"] = (
json.loads(row.authors) if isinstance(row.authors, str) else row.authors orjson.loads(row.authors) if isinstance(row.authors, str) else row.authors
) )
if has_field(info, "media") and shout.media: if has_field(info, "media") and shout.media:
@ -268,8 +267,8 @@ def get_shouts_with_links(info, q, limit=20, offset=0):
media_data = shout.media media_data = shout.media
if isinstance(media_data, str): if isinstance(media_data, str):
try: try:
media_data = json.loads(media_data) media_data = orjson.loads(media_data)
except json.JSONDecodeError: except orjson.JSONDecodeError:
media_data = [] media_data = []
shout_dict["media"] = [media_data] if isinstance(media_data, dict) else media_data shout_dict["media"] = [media_data] if isinstance(media_data, dict) else media_data

View File

@ -1,10 +1,10 @@
import json
import math import math
import time import time
import traceback import traceback
import warnings import warnings
from typing import Any, Callable, Dict, TypeVar from typing import Any, Callable, Dict, TypeVar
import orjson
import sqlalchemy import sqlalchemy
from sqlalchemy import ( from sqlalchemy import (
JSON, JSON,
@ -84,8 +84,8 @@ class Base(declarative_base()):
# Check if the value is JSON and decode it if necessary # Check if the value is JSON and decode it if necessary
if isinstance(value, (str, bytes)) and isinstance(self.__table__.columns[column_name].type, JSON): if isinstance(value, (str, bytes)) and isinstance(self.__table__.columns[column_name].type, JSON):
try: try:
data[column_name] = json.loads(value) data[column_name] = orjson.loads(value)
except (TypeError, json.JSONDecodeError) as e: except (TypeError, orjson.JSONDecodeError) as e:
logger.error(f"Error decoding JSON for column '{column_name}': {e}") logger.error(f"Error decoding JSON for column '{column_name}': {e}")
data[column_name] = value data[column_name] = value
else: else:

View File

@ -1,4 +1,4 @@
import json import orjson
from orm.notification import Notification from orm.notification import Notification
from services.db import local_session from services.db import local_session
@ -18,7 +18,7 @@ async def notify_reaction(reaction, action: str = "create"):
data = {"payload": reaction, "action": action} data = {"payload": reaction, "action": action}
try: try:
save_notification(action, channel_name, data.get("payload")) save_notification(action, channel_name, data.get("payload"))
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, orjson.dumps(data))
except Exception as e: except Exception as e:
logger.error(f"Failed to publish to channel {channel_name}: {e}") logger.error(f"Failed to publish to channel {channel_name}: {e}")
@ -28,7 +28,7 @@ async def notify_shout(shout, action: str = "update"):
data = {"payload": shout, "action": action} data = {"payload": shout, "action": action}
try: try:
save_notification(action, channel_name, data.get("payload")) save_notification(action, channel_name, data.get("payload"))
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, orjson.dumps(data))
except Exception as e: except Exception as e:
logger.error(f"Failed to publish to channel {channel_name}: {e}") logger.error(f"Failed to publish to channel {channel_name}: {e}")
@ -43,7 +43,7 @@ async def notify_follower(follower: dict, author_id: int, action: str = "follow"
save_notification(action, channel_name, data.get("payload")) save_notification(action, channel_name, data.get("payload"))
# Convert data to JSON string # Convert data to JSON string
json_data = json.dumps(data) json_data = orjson.dumps(data)
# Ensure the data is not empty before publishing # Ensure the data is not empty before publishing
if json_data: if json_data:

View File

@ -1,8 +1,8 @@
import asyncio import asyncio
import json
import logging import logging
import os import os
import orjson
from opensearchpy import OpenSearch from opensearchpy import OpenSearch
from services.redis import redis from services.redis import redis
@ -142,7 +142,7 @@ class SearchService:
# Проверка и обновление структуры индекса, если необходимо # Проверка и обновление структуры индекса, если необходимо
result = self.client.indices.get_mapping(index=self.index_name) result = self.client.indices.get_mapping(index=self.index_name)
if isinstance(result, str): if isinstance(result, str):
result = json.loads(result) result = orjson.loads(result)
if isinstance(result, dict): if isinstance(result, dict):
mapping = result.get(self.index_name, {}).get("mappings") mapping = result.get(self.index_name, {}).get("mappings")
logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}") logger.info(f"Найдена структура индексации: {mapping['properties'].keys()}")
@ -210,7 +210,7 @@ class SearchService:
"SETEX", "SETEX",
redis_key, redis_key,
REDIS_TTL, REDIS_TTL,
json.dumps(results, cls=CustomJSONEncoder), orjson.dumps(results, cls=CustomJSONEncoder),
) )
return results return results
return [] return []

View File

@ -1,10 +1,11 @@
import asyncio import asyncio
import json
import os import os
import time import time
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Dict from typing import Dict
import orjson
# ga # ga
from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import ( from google.analytics.data_v1beta.types import (
@ -84,7 +85,7 @@ class ViewedStorage:
logger.warn(f" * {viewfile_path} is too old: {self.start_date}") logger.warn(f" * {viewfile_path} is too old: {self.start_date}")
with open(viewfile_path, "r") as file: with open(viewfile_path, "r") as file:
precounted_views = json.load(file) precounted_views = orjson.load(file)
self.precounted_by_slug.update(precounted_views) self.precounted_by_slug.update(precounted_views)
logger.info(f" * {len(precounted_views)} shouts with views was loaded.") logger.info(f" * {len(precounted_views)} shouts with views was loaded.")

View File

@ -1,8 +1,9 @@
import json
from decimal import Decimal from decimal import Decimal
import orjson
class CustomJSONEncoder(json.JSONEncoder):
class CustomJSONEncoder(orjson.JSONEncoder):
def default(self, obj): def default(self, obj):
if isinstance(obj, Decimal): if isinstance(obj, Decimal):
return str(obj) return str(obj)