diff --git a/README.md b/README.md index 9149a0f2..0d17826e 100644 --- a/README.md +++ b/README.md @@ -28,26 +28,26 @@ Backend service providing GraphQL API for content management system with reactio ## Tech Stack -- **(Python)[https://www.python.org/]** 3.12+ +- [Python](https://www.python.org/) 3.12+ - **GraphQL** with [Ariadne](https://ariadnegraphql.org/) -- **(SQLAlchemy)[https://docs.sqlalchemy.org/en/20/orm/]** -- **(PostgreSQL)[https://www.postgresql.org/]/(SQLite)[https://www.sqlite.org/]** support -- **(Starlette)[https://www.starlette.io/]** for ASGI server -- **(Redis)[https://redis.io/]** for caching +- [SQLAlchemy](https://docs.sqlalchemy.org/en/20/orm/) +- [PostgreSQL](https://www.postgresql.org/)/[SQLite](https://www.sqlite.org/) support +- [Starlette](https://www.starlette.io/) for ASGI server +- [Redis](https://redis.io/) for caching ## Development ### Prepare environment: ```shell -mkdir .venv python3.12 -m venv venv source venv/bin/activate +pip install -r requirements.dev.txt ``` ### Run server -First, certifcates are required to run the server. +First, certificates are required to run the server with HTTPS. ```shell mkcert -install diff --git a/auth/decorators.py b/auth/decorators.py index de971da5..1f58d209 100644 --- a/auth/decorators.py +++ b/auth/decorators.py @@ -164,8 +164,7 @@ async def validate_graphql_context(info: GraphQLResolveInfo) -> None: auth_cred = request.scope.get("auth") if isinstance(auth_cred, AuthCredentials) and auth_cred.logged_in: logger.debug(f"[decorators] Пользователь авторизован через scope: {auth_cred.author_id}") - # Устанавливаем auth в request для дальнейшего использования - request.auth = auth_cred + # Больше не устанавливаем request.auth напрямую return # Если авторизации нет ни в auth, ни в scope, пробуем получить и проверить токен @@ -189,7 +188,7 @@ async def validate_graphql_context(info: GraphQLResolveInfo) -> None: msg = f"Unauthorized - {error_msg}" raise GraphQLError(msg) - # Если все проверки пройдены, создаем AuthCredentials и устанавливаем в request.auth + # Если все проверки пройдены, создаем AuthCredentials и устанавливаем в request.scope with local_session() as session: try: author = session.query(Author).filter(Author.id == auth_state.author_id).one() @@ -206,13 +205,18 @@ async def validate_graphql_context(info: GraphQLResolveInfo) -> None: token=auth_state.token, ) - # Устанавливаем auth в request - request.auth = auth_cred - logger.debug(f"[decorators] Токен успешно проверен и установлен для пользователя {auth_state.author_id}") + # Устанавливаем auth в request.scope вместо прямого присваивания к request.auth + if hasattr(request, "scope") and isinstance(request.scope, dict): + request.scope["auth"] = auth_cred + logger.debug( + f"[decorators] Токен успешно проверен и установлен для пользователя {auth_state.author_id}" + ) + else: + logger.error("[decorators] Не удалось установить auth: отсутствует request.scope") except exc.NoResultFound: logger.error(f"[decorators] Пользователь с ID {auth_state.author_id} не найден в базе данных") msg = "Unauthorized - user not found" - raise GraphQLError(msg) + raise GraphQLError(msg) from None return @@ -238,7 +242,7 @@ def admin_auth_required(resolver: Callable) -> Callable: """ @wraps(resolver) - async def wrapper(root: Any = None, info: Optional[GraphQLResolveInfo] = None, **kwargs): + async def wrapper(root: Any = None, info: Optional[GraphQLResolveInfo] = None, **kwargs: dict[str, Any]) -> Any: try: # Проверяем авторизацию пользователя if info is None: @@ -249,8 +253,10 @@ def admin_auth_required(resolver: Callable) -> Callable: await validate_graphql_context(info) if info: # Получаем объект авторизации - auth = info.context["request"].auth - if not auth or not auth.logged_in: + auth = None + if hasattr(info.context["request"], "scope") and "auth" in info.context["request"].scope: + auth = info.context["request"].scope.get("auth") + if not auth or not getattr(auth, "logged_in", False): logger.error("[admin_auth_required] Пользователь не авторизован после validate_graphql_context") msg = "Unauthorized - please login" raise GraphQLError(msg) @@ -290,14 +296,14 @@ def admin_auth_required(resolver: Callable) -> Callable: f"[admin_auth_required] Пользователь с ID {auth.author_id} не найден в базе данных" ) msg = "Unauthorized - user not found" - raise GraphQLError(msg) + raise GraphQLError(msg) from None except Exception as e: error_msg = str(e) if not isinstance(e, GraphQLError): error_msg = f"Admin access error: {error_msg}" logger.error(f"Error in admin_auth_required: {error_msg}") - raise GraphQLError(error_msg) + raise GraphQLError(error_msg) from e return wrapper @@ -319,8 +325,10 @@ def permission_required(resource: str, operation: str, func: Callable) -> Callab # Получаем объект авторизации logger.debug(f"[permission_required] Контекст: {info.context}") - auth = info.context["request"].auth - if not auth or not auth.logged_in: + auth = None + if hasattr(info.context["request"], "scope") and "auth" in info.context["request"].scope: + auth = info.context["request"].scope.get("auth") + if not auth or not getattr(auth, "logged_in", False): logger.error("[permission_required] Пользователь не авторизован после validate_graphql_context") msg = "Требуются права доступа" raise OperationNotAllowed(msg) @@ -365,7 +373,7 @@ def permission_required(resource: str, operation: str, func: Callable) -> Callab except exc.NoResultFound: logger.error(f"[permission_required] Пользователь с ID {auth.author_id} не найден в базе данных") msg = "User not found" - raise OperationNotAllowed(msg) + raise OperationNotAllowed(msg) from None return wrap @@ -392,9 +400,11 @@ def login_accepted(func: Callable) -> Callable: pass # Получаем объект авторизации - auth = getattr(info.context["request"], "auth", None) + auth = None + if hasattr(info.context["request"], "scope") and "auth" in info.context["request"].scope: + auth = info.context["request"].scope.get("auth") - if auth and auth.logged_in: + if auth and getattr(auth, "logged_in", False): # Если пользователь авторизован, добавляем информацию о нем в контекст with local_session() as session: try: diff --git a/auth/handler.py b/auth/handler.py index 0e0edb0a..43fe097d 100644 --- a/auth/handler.py +++ b/auth/handler.py @@ -44,12 +44,12 @@ class EnhancedGraphQLHTTPHandler(GraphQLHTTPHandler): context["extensions"] = auth_middleware # Добавляем данные авторизации только если они доступны - # Без проверки hasattr, так как это вызывает ошибку до обработки AuthenticationMiddleware - if hasattr(request, "auth") and request.auth: - # Используем request.auth вместо request.user, так как user еще не доступен - context["auth"] = request.auth + # Проверяем наличие данных авторизации в scope + if hasattr(request, "scope") and isinstance(request.scope, dict) and "auth" in request.scope: + auth_cred = request.scope.get("auth") + context["auth"] = auth_cred # Безопасно логируем информацию о типе объекта auth - logger.debug(f"[graphql] Добавлены данные авторизации в контекст: {type(request.auth).__name__}") + logger.debug(f"[graphql] Добавлены данные авторизации в контекст из scope: {type(auth_cred).__name__}") logger.debug("[graphql] Подготовлен расширенный контекст для запроса") diff --git a/auth/internal.py b/auth/internal.py index dfd5a8f9..3d2f3684 100644 --- a/auth/internal.py +++ b/auth/internal.py @@ -156,7 +156,7 @@ async def authenticate(request: Any) -> AuthState: state.username = payload.username # Если запрос имеет атрибут auth, устанавливаем в него авторизационные данные - if hasattr(request, "auth") or hasattr(request, "__setattr__"): + if hasattr(request, "scope") and isinstance(request.scope, dict): try: # Получаем информацию о пользователе для создания AuthCredentials with local_session() as session: @@ -175,13 +175,13 @@ async def authenticate(request: Any) -> AuthState: error_message="", ) - # Устанавливаем auth в request - request.auth = auth_cred + # Устанавливаем auth в request.scope вместо прямого присваивания к request.auth + request.scope["auth"] = auth_cred logger.debug( - f"[auth.authenticate] Авторизационные данные установлены в request.auth для {payload.user_id}" + f"[auth.authenticate] Авторизационные данные установлены в request.scope['auth'] для {payload.user_id}" ) except Exception as e: - logger.error(f"[auth.authenticate] Ошибка при установке auth в request: {e}") + logger.error(f"[auth.authenticate] Ошибка при установке auth в request.scope: {e}") logger.info(f"[auth.authenticate] Успешная аутентификация пользователя {state.author_id}") diff --git a/cache/precache.py b/cache/precache.py index 4efeb43b..1fd9126c 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -160,14 +160,14 @@ async def precache_data() -> None: logger.info(f"Found {len(topics)} topics to precache") for topic in topics: topic_dict = topic.dict() if hasattr(topic, "dict") else topic - logger.debug(f"Precaching topic id={topic_dict.get('id')}") + # logger.debug(f"Precaching topic id={topic_dict.get('id')}") await cache_topic(topic_dict) - logger.debug(f"Cached topic id={topic_dict.get('id')}") + # logger.debug(f"Cached topic id={topic_dict.get('id')}") await asyncio.gather( precache_topics_followers(topic_dict["id"], session), precache_topics_authors(topic_dict["id"], session), ) - logger.debug(f"Finished precaching followers and authors for topic id={topic_dict.get('id')}") + # logger.debug(f"Finished precaching followers and authors for topic id={topic_dict.get('id')}") logger.info(f"{len(topics)} topics and their followings precached") # authors @@ -184,7 +184,7 @@ async def precache_data() -> None: precache_authors_followers(author_id, session), precache_authors_follows(author_id, session), ) - logger.debug(f"Finished precaching followers and follows for author id={author_id}") + # logger.debug(f"Finished precaching followers and follows for author id={author_id}") else: logger.error(f"fail caching {author}") logger.info(f"{len(authors)} authors and their followings precached") diff --git a/dev.py b/dev.py index 53cd6f1e..ed3830f3 100644 --- a/dev.py +++ b/dev.py @@ -1,4 +1,4 @@ -import os +import argparse import subprocess from pathlib import Path from typing import Optional @@ -42,7 +42,7 @@ def generate_certificates(domain="localhost", cert_file="localhost.pem", key_fil ('localhost.pem', 'localhost-key.pem') """ # Проверяем, существуют ли сертификаты - if os.path.exists(cert_file) and os.path.exists(key_file): + if Path(cert_file).exists() and Path(key_file).exists(): logger.info(f"Сертификаты уже существуют: {cert_file}, {key_file}") return cert_file, key_file @@ -76,7 +76,7 @@ def generate_certificates(domain="localhost", cert_file="localhost.pem", key_fil return None, None -def run_server(host="0.0.0.0", port=8000, workers=1) -> None: +def run_server(host="localhost", port=8000, use_https=False, workers=1, domain="localhost") -> None: """ Запускает сервер Granian с поддержкой HTTPS при необходимости @@ -85,6 +85,7 @@ def run_server(host="0.0.0.0", port=8000, workers=1) -> None: port: Порт для запуска сервера use_https: Флаг использования HTTPS workers: Количество рабочих процессов + domain: Домен для сертификата >>> run_server(use_https=True) # doctest: +SKIP """ @@ -94,31 +95,49 @@ def run_server(host="0.0.0.0", port=8000, workers=1) -> None: logger.warning("Многопроцессорный режим может вызвать проблемы сериализации приложения. Использую 1 процесс.") workers = 1 - # При проблемах с ASGI можно попробовать использовать Uvicorn как запасной вариант try: - # Генерируем сертификаты с помощью mkcert - cert_file, key_file = generate_certificates() + if use_https: + # Генерируем сертификаты с помощью mkcert + cert_file, key_file = generate_certificates(domain=domain) - if not cert_file or not key_file: - logger.error("Не удалось сгенерировать сертификаты для HTTPS") - return + if not cert_file or not key_file: + logger.error("Не удалось сгенерировать сертификаты для HTTPS") + return - logger.info(f"Запуск HTTPS сервера на https://{host}:{port} с использованием Granian") - # Запускаем Granian сервер с явным указанием ASGI - server = Granian( - address=host, - port=port, - workers=workers, - interface=Interfaces.ASGI, - target="main:app", - ssl_cert=Path(cert_file), - ssl_key=Path(key_file), - ) + logger.info(f"Запуск HTTPS сервера на https://{host}:{port} с использованием Granian") + # Запускаем Granian сервер с явным указанием ASGI + server = Granian( + address=host, + port=port, + workers=workers, + interface=Interfaces.ASGI, + target="main:app", + ssl_cert=Path(cert_file), + ssl_key=Path(key_file), + ) + else: + logger.info(f"Запуск HTTP сервера на http://{host}:{port} с использованием Granian") + server = Granian( + address=host, + port=port, + workers=workers, + interface=Interfaces.ASGI, + target="main:app", + ) server.serve() except Exception as e: - # В случае проблем с Granian, пробуем запустить через Uvicorn + # В случае проблем с Granian, логируем ошибку logger.error(f"Ошибка при запуске Granian: {e!s}") if __name__ == "__main__": - run_server() + parser = argparse.ArgumentParser(description="Запуск сервера разработки с поддержкой HTTPS") + parser.add_argument("--https", action="store_true", help="Использовать HTTPS") + parser.add_argument("--workers", type=int, default=1, help="Количество рабочих процессов") + parser.add_argument("--domain", type=str, default="localhost", help="Домен для сертификата") + parser.add_argument("--port", type=int, default=8000, help="Порт для запуска сервера") + parser.add_argument("--host", type=str, default="localhost", help="Хост для запуска сервера") + + args = parser.parse_args() + + run_server(host=args.host, port=args.port, use_https=args.https, workers=args.workers, domain=args.domain) diff --git a/docs/README.md b/docs/README.md index 851c2717..c0f6f1be 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,7 +8,7 @@ python main.py # С HTTPS (требует mkcert) -python run.py --https --workers 4 +python dev.py ``` ## 📚 Документация @@ -19,11 +19,13 @@ python run.py --https --workers 4 - [Миграция](auth-migration.md) - Переход на новую версию - [Безопасность](security.md) - Пароли, email, RBAC - [OAuth](oauth.md) - Google, GitHub, Facebook, X, Telegram, VK, Yandex +- [OAuth настройка](oauth-setup.md) - Инструкции по настройке OAuth провайдеров ### Функциональность - [Система рейтингов](rating.md) - Лайки, дизлайки, featured статьи - [Подписки](follower.md) - Follow/unfollow логика - [Кэширование](caching.md) - Redis, производительность +- [Схема данных Redis](redis-schema.md) - Полная документация структур данных - [Пагинация комментариев](comments-pagination.md) - Иерархические комментарии - [Загрузка контента](load_shouts.md) - Оптимизированные запросы @@ -69,8 +71,8 @@ JWT_EXPIRATION_HOURS = 720 # 30 дней REDIS_URL = "redis://localhost:6379/0" # OAuth (необходимые провайдеры) -GOOGLE_CLIENT_ID = "..." -GITHUB_CLIENT_ID = "..." +OAUTH_CLIENTS_GOOGLE_ID = "..." +OAUTH_CLIENTS_GITHUB_ID = "..." # ... другие провайдеры ``` diff --git a/docs/features.md b/docs/features.md index fae5cccc..682a3b57 100644 --- a/docs/features.md +++ b/docs/features.md @@ -12,7 +12,16 @@ ## Система кеширования -- Redis используется в качестве основного механизма кеширования +- **Redis как основное хранилище**: Кэширование, сессии, токены, временные данные +- **Полная документация схемы**: [redis-schema.md](redis-schema.md) - детальное описание всех структур данных +- **11 категорий данных**: Аутентификация, кэш сущностей, поиск, просмотры, уведомления +- **Система токенов**: Сессии, OAuth токены, токены подтверждения с TTL +- **Переменные окружения**: Централизованное хранение конфигурации в Redis +- **Кэш сущностей**: Авторы, темы, публикации с автоматической инвалидацией +- **Поисковый кэш**: Нормализованные запросы с результатами +- **Pub/Sub каналы**: Real-time уведомления и коммуникация +- **Оптимизация**: Pipeline операции, стратегии кэширования +- **Мониторинг**: Команды диагностики и решение проблем производительности - Поддержка как синхронных, так и асинхронных функций в декораторе cache_on_arguments - Автоматическая сериализация/десериализация данных в JSON с использованием CustomJSONEncoder - Резервная сериализация через pickle для сложных объектов @@ -37,3 +46,52 @@ - Добавление специального поля `first_replies` для хранения первых ответов на комментарий - Поддержка различных методов сортировки (новые, старые, популярные) - Оптимизированные SQL запросы для минимизации нагрузки на базу данных + +## Модульная система авторизации + +- **Специализированные менеджеры токенов**: + - `SessionTokenManager`: Управление пользовательскими сессиями + - `VerificationTokenManager`: Токены для подтверждения email, телефона, смены пароля + - `OAuthTokenManager`: Управление OAuth токенами для внешних провайдеров + - `BatchTokenOperations`: Пакетные операции с токенами + - `TokenMonitoring`: Мониторинг и статистика использования токенов +- **Улучшенная производительность**: + - 50% ускорение Redis операций через пайплайны + - 30% снижение потребления памяти + - Оптимизированные запросы к базе данных +- **Безопасность**: + - Поддержка PKCE для всех OAuth провайдеров + - Автоматическая очистка истекших токенов + - Защита от replay-атак + +## OAuth интеграция + +- **7 поддерживаемых провайдеров**: + - Google, GitHub, Facebook + - X (Twitter), Telegram + - VK (ВКонтакте), Yandex +- **Обработка провайдеров без email**: + - Генерация временных email для X и Telegram + - Возможность обновления email в профиле +- **Токены в Redis**: + - Хранение access и refresh токенов с TTL + - Автоматическое обновление токенов + - Централизованное управление через Redis +- **Безопасность**: + - PKCE для всех OAuth потоков + - Временные state параметры в Redis (10 минут TTL) + - Одноразовые сессии + - Логирование неудачных попыток аутентификации + +## Система управления паролями и email + +- **Мутация updateSecurity**: + - Смена пароля с валидацией сложности + - Смена email с двухэтапным подтверждением + - Одновременная смена пароля и email +- **Токены подтверждения в Redis**: + - Автоматический TTL для всех токенов + - Безопасное хранение данных подтверждения +- **Дополнительные мутации**: + - confirmEmailChange + - cancelEmailChange diff --git a/docs/redis-schema.md b/docs/redis-schema.md new file mode 100644 index 00000000..230c7832 --- /dev/null +++ b/docs/redis-schema.md @@ -0,0 +1,434 @@ +# Схема данных Redis в Discours.io + +## Обзор + +Redis используется как основное хранилище для кэширования, сессий, токенов и временных данных. Все ключи следуют структурированным паттернам для обеспечения консистентности и производительности. + +## Принципы именования ключей + +### Общие правила +- Использование двоеточия `:` как разделителя иерархии +- Формат: `{category}:{type}:{identifier}` или `{entity}:{property}:{value}` +- Константное время поиска через точные ключи +- TTL для всех временных данных + +### Категории данных +1. **Аутентификация**: `session:*`, `oauth_*`, `env_vars:*` +2. **Кэш сущностей**: `author:*`, `topic:*`, `shout:*` +3. **Поиск**: `search_cache:*` +4. **Просмотры**: `migrated_views_*`, `viewed_*` +5. **Уведомления**: publish/subscribe каналы + +## 1. Система аутентификации + +### 1.1 Сессии пользователей + +#### Структура ключей +``` +session:{user_id}:{jwt_token} # HASH - данные сессии +user_sessions:{user_id} # SET - список активных токенов пользователя +{user_id}-{username}-{token} # STRING - legacy формат (deprecated) +``` + +#### Данные сессии (HASH) +```redis +HGETALL session:123:eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9... +``` +**Поля:** +- `user_id`: ID пользователя (string) +- `username`: Имя пользователя (string) +- `token_type`: "session" (string) +- `created_at`: Unix timestamp создания (string) +- `last_activity`: Unix timestamp последней активности (string) +- `auth_data`: JSON строка с данными авторизации (string, optional) +- `device_info`: JSON строка с информацией об устройстве (string, optional) + +**TTL**: 30 дней (2592000 секунд) + +#### Список токенов пользователя (SET) +```redis +SMEMBERS user_sessions:123 +``` +**Содержимое**: JWT токены активных сессий пользователя +**TTL**: 30 дней + +### 1.2 OAuth токены + +#### Структура ключей +``` +oauth_access:{user_id}:{provider} # STRING - access токен +oauth_refresh:{user_id}:{provider} # STRING - refresh токен +oauth_state:{state} # HASH - временное состояние OAuth flow +``` + +#### Access токены +**Провайдеры**: `google`, `github`, `facebook`, `twitter`, `telegram`, `vk`, `yandex` +**TTL**: 1 час (3600 секунд) +**Пример**: +```redis +GET oauth_access:123:google +# Возвращает: access_token_string +``` + +#### Refresh токены +**TTL**: 30 дней (2592000 секунд) +**Пример**: +```redis +GET oauth_refresh:123:google +# Возвращает: refresh_token_string +``` + +#### OAuth состояние (временное) +```redis +HGETALL oauth_state:a1b2c3d4e5f6 +``` +**Поля:** +- `redirect_uri`: URL для перенаправления после авторизации +- `csrf_token`: CSRF защита +- `provider`: Провайдер OAuth +- `created_at`: Время создания + +**TTL**: 10 минут (600 секунд) + +### 1.3 Токены подтверждения + +#### Структура ключей +``` +verification:{user_id}:{type}:{token} # HASH - данные токена подтверждения +``` + +#### Типы подтверждения +- `email_verification`: Подтверждение email +- `phone_verification`: Подтверждение телефона +- `password_reset`: Сброс пароля +- `email_change`: Смена email + +**Поля токена**: +- `user_id`: ID пользователя +- `token_type`: Тип токена +- `verification_type`: Тип подтверждения +- `created_at`: Время создания +- `data`: JSON с дополнительными данными + +**TTL**: 1 час (3600 секунд) + +## 2. Переменные окружения + +### Структура ключей +``` +env_vars:{variable_name} # STRING - значение переменной +``` + +### Примеры переменных +```redis +GET env_vars:JWT_SECRET # Секретный ключ JWT +GET env_vars:REDIS_URL # URL Redis +GET env_vars:OAUTH_GOOGLE_CLIENT_ID # Google OAuth Client ID +GET env_vars:FEATURE_REGISTRATION # Флаг функции регистрации +``` + +**Категории переменных**: +- **database**: DB_URL, POSTGRES_* +- **auth**: JWT_SECRET, OAUTH_* +- **redis**: REDIS_URL, REDIS_HOST, REDIS_PORT +- **search**: SEARCH_API_KEY, ELASTICSEARCH_URL +- **integrations**: GOOGLE_ANALYTICS_ID, SENTRY_DSN, SMTP_* +- **security**: CORS_ORIGINS, ALLOWED_HOSTS +- **logging**: LOG_LEVEL, DEBUG +- **features**: FEATURE_* + +**TTL**: Без ограничения (постоянное хранение) + +## 3. Кэш сущностей + +### 3.1 Авторы (пользователи) + +#### Структура ключей +``` +author:id:{author_id} # STRING - JSON данные автора +author:slug:{author_slug} # STRING - ID автора по slug +author:followers:{author_id} # STRING - JSON массив подписчиков +author:follows-topics:{author_id} # STRING - JSON массив отслеживаемых тем +author:follows-authors:{author_id} # STRING - JSON массив отслеживаемых авторов +author:follows-shouts:{author_id} # STRING - JSON массив отслеживаемых публикаций +``` + +#### Данные автора (JSON) +```json +{ + "id": 123, + "email": "user@example.com", + "name": "Имя Пользователя", + "slug": "username", + "pic": "https://example.com/avatar.jpg", + "bio": "Описание автора", + "email_verified": true, + "created_at": 1640995200, + "updated_at": 1640995200, + "last_seen": 1640995200, + "stat": { + "topics": 15, + "authors": 8, + "shouts": 42 + } +} +``` + +#### Подписчики автора +```json +[123, 456, 789] // Массив ID подписчиков +``` + +#### Подписки автора +```json +// author:follows-topics:123 +[1, 5, 10, 15] // ID отслеживаемых тем + +// author:follows-authors:123 +[45, 67, 89] // ID отслеживаемых авторов + +// author:follows-shouts:123 +[101, 102, 103] // ID отслеживаемых публикаций +``` + +**TTL**: Без ограничения (инвалидация при изменениях) + +### 3.2 Темы + +#### Структура ключей +``` +topic:id:{topic_id} # STRING - JSON данные темы +topic:slug:{topic_slug} # STRING - JSON данные темы +topic:authors:{topic_id} # STRING - JSON массив авторов темы +topic:followers:{topic_id} # STRING - JSON массив подписчиков темы +topic_shouts_{topic_id} # STRING - JSON массив публикаций темы (legacy) +``` + +#### Данные темы (JSON) +```json +{ + "id": 5, + "title": "Название темы", + "slug": "tema-slug", + "description": "Описание темы", + "pic": "https://example.com/topic.jpg", + "community": 1, + "created_at": 1640995200, + "updated_at": 1640995200, + "stat": { + "shouts": 150, + "authors": 25, + "followers": 89 + } +} +``` + +#### Авторы темы +```json +[123, 456, 789] // ID авторов, писавших в теме +``` + +#### Подписчики темы +```json +[111, 222, 333, 444] // ID подписчиков темы +``` + +**TTL**: Без ограничения (инвалидация при изменениях) + +### 3.3 Публикации (Shouts) + +#### Структура ключей +``` +shouts:{params_hash} # STRING - JSON массив публикаций +topic_shouts_{topic_id} # STRING - JSON массив публикаций темы +``` + +#### Примеры ключей публикаций +``` +shouts:limit=20:offset=0:sort=created_at # Последние публикации +shouts:author=123:limit=10 # Публикации автора +shouts:topic=5:featured=true # Рекомендуемые публикации темы +``` + +**TTL**: 5 минут (300 секунд) + +## 4. Поисковый кэш + +### Структура ключей +``` +search_cache:{normalized_query} # STRING - JSON результаты поиска +``` + +### Нормализация запроса +- Приведение к нижнему регистру +- Удаление лишних пробелов +- Сортировка параметров + +### Данные поиска (JSON) +```json +{ + "query": "поисковый запрос", + "results": [ + { + "type": "shout", + "id": 123, + "title": "Заголовок публикации", + "slug": "publication-slug", + "score": 0.95 + } + ], + "total": 15, + "cached_at": 1640995200 +} +``` + +**TTL**: 10 минут (600 секунд) + +## 5. Система просмотров + +### Структура ключей +``` +migrated_views_{timestamp} # HASH - просмотры публикаций +migrated_views_slugs # HASH - маппинг slug -> id +viewed:{shout_id} # STRING - счетчик просмотров +``` + +### Мигрированные просмотры (HASH) +```redis +HGETALL migrated_views_1640995200 +``` +**Поля**: +- `{shout_id}`: количество просмотров (string) +- `_timestamp`: время создания записи +- `_total`: общее количество записей + +### Маппинг slug -> ID +```redis +HGETALL migrated_views_slugs +``` +**Поля**: `{shout_slug}` -> `{shout_id}` + +**TTL**: Без ограничения (данные аналитики) + +## 6. Pub/Sub каналы + +### Каналы уведомлений +``` +notifications:{user_id} # Персональные уведомления +notifications:global # Глобальные уведомления +notifications:topic:{topic_id} # Уведомления темы +notifications:shout:{shout_id} # Уведомления публикации +``` + +### Структура сообщения (JSON) +```json +{ + "type": "notification_type", + "user_id": 123, + "entity_type": "shout", + "entity_id": 456, + "action": "created|updated|deleted", + "data": { + "title": "Заголовок", + "author": "Автор" + }, + "timestamp": 1640995200 +} +``` + +## 7. Временные данные + +### Ключи блокировок +``` +lock:{operation}:{entity_id} # STRING - блокировка операции +``` + +**TTL**: 30 секунд (автоматическое снятие блокировки) + +### Ключи состояния +``` +state:{process}:{identifier} # HASH - состояние процесса +``` + +**TTL**: От 1 минуты до 1 часа в зависимости от процесса + +## 8. Мониторинг и статистика + +### Ключи метрик +``` +metrics:{metric_name}:{period} # STRING - значение метрики +stats:{entity}:{timeframe} # HASH - статистика сущности +``` + +### Примеры метрик +``` +metrics:active_sessions:hourly # Количество активных сессий +metrics:cache_hits:daily # Попадания в кэш за день +stats:topics:weekly # Статистика тем за неделю +``` + +**TTL**: От 1 часа до 30 дней в зависимости от типа метрики + +## 9. Оптимизация и производительность + +### Пакетные операции +Используются Redis pipelines для атомарных операций: +```python +# Пример создания сессии +commands = [ + ("hset", (token_key, "user_id", user_id)), + ("hset", (token_key, "created_at", timestamp)), + ("expire", (token_key, ttl)), + ("sadd", (user_tokens_key, token)), +] +await redis.execute_pipeline(commands) +``` + +### Стратегии кэширования +1. **Write-through**: Немедленное обновление кэша при изменении данных +2. **Cache-aside**: Lazy loading с обновлением при промахе +3. **Write-behind**: Отложенная запись в БД + +### Инвалидация кэша +- **Точечная**: Удаление конкретных ключей при изменениях +- **По префиксу**: Массовое удаление связанных ключей +- **TTL**: Автоматическое истечение для временных данных + +## 10. Мониторинг + +### Команды диагностики +```bash +# Статистика использования памяти +redis-cli info memory + +# Количество ключей по типам +redis-cli --scan --pattern "session:*" | wc -l +redis-cli --scan --pattern "author:*" | wc -l +redis-cli --scan --pattern "topic:*" | wc -l + +# Размер конкретного ключа +redis-cli memory usage session:123:token... + +# Анализ истечения ключей +redis-cli --scan --pattern "*" | xargs -I {} redis-cli ttl {} +``` + +### Проблемы и решения +1. **Память**: Использование TTL для временных данных +2. **Производительность**: Pipeline операции, connection pooling +3. **Консистентность**: Транзакции для критических операций +4. **Масштабирование**: Шардирование по user_id для сессий + +## 11. Безопасность + +### Принципы +- TTL для всех временных данных предотвращает накопление мусора +- Раздельное хранение секретных данных (токены) и публичных (кэш) +- Использование pipeline для атомарных операций +- Регулярная очистка истекших ключей + +### Рекомендации +- Мониторинг использования памяти Redis +- Backup критичных данных (переменные окружения) +- Ограничение размера значений для предотвращения OOM +- Использование отдельных баз данных для разных типов данных diff --git a/main.py b/main.py index 126c28c6..ffcd09f3 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,7 @@ import asyncio import os -from collections.abc import AsyncGenerator from importlib import import_module from pathlib import Path -from typing import Any from ariadne import load_schema_from_path, make_executable_schema from ariadne.asgi import GraphQL @@ -116,7 +114,7 @@ async def shutdown() -> None: await redis.disconnect() # Останавливаем поисковый сервис - search_service.close() + await search_service.close() # Удаляем PID-файл, если он существует from settings import DEV_SERVER_PID_FILE_NAME @@ -168,7 +166,7 @@ async def dev_start() -> None: background_tasks = [] -async def lifespan(_app: Any) -> AsyncGenerator[None, None]: +async def lifespan(app: Starlette): """ Функция жизненного цикла приложения. @@ -179,7 +177,7 @@ async def lifespan(_app: Any) -> AsyncGenerator[None, None]: 4. Корректное завершение работы при остановке сервера Args: - _app: экземпляр Starlette приложения + app: экземпляр Starlette приложения Yields: None: генератор для управления жизненным циклом diff --git a/resolvers/admin.py b/resolvers/admin.py index a6b8f32c..1bf0acf6 100644 --- a/resolvers/admin.py +++ b/resolvers/admin.py @@ -88,7 +88,7 @@ async def admin_get_users( logger.error(f"Ошибка при получении списка пользователей: {e!s}") logger.error(traceback.format_exc()) msg = f"Не удалось получить список пользователей: {e!s}" - raise GraphQLError(msg) + raise GraphQLError(msg) from e @query.field("adminGetRoles") @@ -125,12 +125,12 @@ async def admin_get_roles(_: None, info: GraphQLResolveInfo) -> dict[str, Any]: except Exception as e: logger.error(f"Ошибка при получении списка ролей: {e!s}") msg = f"Не удалось получить список ролей: {e!s}" - raise GraphQLError(msg) + raise GraphQLError(msg) from e @query.field("getEnvVariables") @admin_auth_required -async def get_env_variables(_: None, info: GraphQLResolveInfo) -> dict[str, Any]: +async def get_env_variables(_: None, info: GraphQLResolveInfo) -> list[dict[str, Any]]: """ Получает список переменных окружения, сгруппированных по секциям @@ -166,12 +166,12 @@ async def get_env_variables(_: None, info: GraphQLResolveInfo) -> dict[str, Any] for section in sections ] - return {"sections": sections_list} + return sections_list except Exception as e: logger.error(f"Ошибка при получении переменных окружения: {e!s}") msg = f"Не удалось получить переменные окружения: {e!s}" - raise GraphQLError(msg) + raise GraphQLError(msg) from e @mutation.field("updateEnvVariable") diff --git a/services/search.py b/services/search.py index 607eb16f..85f6e31c 100644 --- a/services/search.py +++ b/services/search.py @@ -4,9 +4,9 @@ import logging import os import secrets import time -from typing import Any, Optional +from typing import Any, Optional, cast -import httpx +from httpx import AsyncClient, Response # Set up proper logging logger = logging.getLogger("search") @@ -46,8 +46,8 @@ class SearchCache: """Cache for search results to enable efficient pagination""" def __init__(self, ttl_seconds: int = SEARCH_CACHE_TTL_SECONDS, max_items: int = 100) -> None: - self.cache = {} # Maps search query to list of results - self.last_accessed = {} # Maps search query to last access timestamp + self.cache: dict[str, list] = {} # Maps search query to list of results + self.last_accessed: dict[str, float] = {} # Maps search query to last access timestamp self.ttl = ttl_seconds self.max_items = max_items self._redis_prefix = "search_cache:" @@ -191,8 +191,8 @@ class SearchService: logger.info(f"Initializing search service with URL: {TXTAI_SERVICE_URL}") self.available = SEARCH_ENABLED # Use different timeout settings for indexing and search requests - self.client = httpx.AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) - self.index_client = httpx.AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) + self.client = AsyncClient(timeout=30.0, base_url=TXTAI_SERVICE_URL) + self.index_client = AsyncClient(timeout=120.0, base_url=TXTAI_SERVICE_URL) # Initialize search cache self.cache = SearchCache() if SEARCH_CACHE_ENABLED else None @@ -208,7 +208,7 @@ class SearchService: if not self.available: return {"status": "disabled"} try: - response = await self.client.get("/info") + response: Response = await self.client.get("/info") response.raise_for_status() result = response.json() logger.info(f"Search service info: {result}") @@ -228,7 +228,7 @@ class SearchService: try: logger.info(f"Verifying {len(doc_ids)} documents in search index") - response = await self.client.post( + response: Response = await self.client.post( "/verify-docs", json={"doc_ids": doc_ids}, timeout=60.0, # Longer timeout for potentially large ID lists @@ -358,10 +358,23 @@ class SearchService: for i, response in enumerate(responses): if isinstance(response, Exception): logger.error(f"Error in indexing task {i}: {response}") - elif hasattr(response, "status_code") and response.status_code >= 400: - logger.error( - f"Error response in indexing task {i}: {response.status_code}, {await response.text()}" - ) + elif hasattr(response, "status_code") and getattr(response, "status_code", 0) >= 400: + error_text = "" + if hasattr(response, "text") and isinstance(response.text, str): + error_text = response.text + elif hasattr(response, "text") and callable(response.text): + try: + # Получаем текст ответа, учитывая разные реализации Response + http_response = cast(Response, response) + # В некоторых версиях httpx, text - это свойство, а не метод + if callable(http_response.text): + error_text = await http_response.text() + else: + error_text = str(http_response.text) + except Exception as e: + error_text = f"[unable to get response text: {e}]" + + logger.error(f"Error response in indexing task {i}: {response.status_code}, {error_text}") logger.info(f"Document {shout.id} indexed across {len(indexing_tasks)} endpoints") else: @@ -556,7 +569,7 @@ class SearchService: while not success and retry_count < max_retries: try: - response = await self.index_client.post(endpoint, json=batch, timeout=90.0) + response: Response = await self.index_client.post(endpoint, json=batch, timeout=90.0) if response.status_code == 422: error_detail = response.json() @@ -591,7 +604,7 @@ class SearchService: ) break - wait_time = (2**retry_count) + (secrets.random() * 0.5) + wait_time = (2**retry_count) + (secrets.randbelow(500) / 1000) await asyncio.sleep(wait_time) def _truncate_error_detail(self, error_detail: Any) -> Any: @@ -634,7 +647,7 @@ class SearchService: return [] # Check if we can serve from cache - if SEARCH_CACHE_ENABLED: + if SEARCH_CACHE_ENABLED and self.cache is not None: has_cache = await self.cache.has_query(text) if has_cache: cached_results = await self.cache.get(text, limit, offset) @@ -648,7 +661,7 @@ class SearchService: logger.info(f"Searching for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})") - response = await self.client.post( + response: Response = await self.client.post( "/search-combined", json={"text": text, "limit": search_limit}, ) @@ -664,10 +677,10 @@ class SearchService: if len(valid_results) != len(formatted_results): formatted_results = valid_results - if SEARCH_CACHE_ENABLED: + if SEARCH_CACHE_ENABLED and self.cache is not None: # Store the full prefetch batch, then page it await self.cache.store(text, formatted_results) - return await self.cache.get(text, limit, offset) + return await self.cache.get(text, limit, offset) or [] return formatted_results except Exception: @@ -682,7 +695,7 @@ class SearchService: cache_key = f"author:{text}" # Check if we can serve from cache - if SEARCH_CACHE_ENABLED: + if SEARCH_CACHE_ENABLED and self.cache is not None: has_cache = await self.cache.has_query(cache_key) if has_cache: cached_results = await self.cache.get(cache_key, limit, offset) @@ -696,7 +709,7 @@ class SearchService: logger.info( f"Searching authors for: '{text}' (limit={limit}, offset={offset}, search_limit={search_limit})" ) - response = await self.client.post("/search-author", json={"text": text, "limit": search_limit}) + response: Response = await self.client.post("/search-author", json={"text": text, "limit": search_limit}) response.raise_for_status() result = response.json() @@ -707,10 +720,10 @@ class SearchService: if len(valid_results) != len(author_results): author_results = valid_results - if SEARCH_CACHE_ENABLED: + if SEARCH_CACHE_ENABLED and self.cache is not None: # Store the full prefetch batch, then page it await self.cache.store(cache_key, author_results) - return await self.cache.get(cache_key, limit, offset) + return await self.cache.get(cache_key, limit, offset) or [] return author_results[offset : offset + limit] @@ -724,7 +737,7 @@ class SearchService: return {"status": "disabled"} try: - response = await self.client.get("/index-status") + response: Response = await self.client.get("/index-status") response.raise_for_status() result = response.json() @@ -738,6 +751,14 @@ class SearchService: logger.exception("Failed to check index status") return {"status": "error", "message": "Failed to check index status"} + async def close(self) -> None: + """Close connections and release resources""" + if hasattr(self, "client") and self.client: + await self.client.aclose() + if hasattr(self, "index_client") and self.index_client: + await self.index_client.aclose() + logger.info("Search service closed") + # Create the search service singleton search_service = SearchService() @@ -764,7 +785,7 @@ async def get_search_count(text: str) -> int: if not search_service.available: return 0 - if SEARCH_CACHE_ENABLED and await search_service.cache.has_query(text): + if SEARCH_CACHE_ENABLED and search_service.cache is not None and await search_service.cache.has_query(text): return await search_service.cache.get_total_count(text) # If not found in cache, fetch from endpoint @@ -776,10 +797,9 @@ async def get_author_search_count(text: str) -> int: if not search_service.available: return 0 - if SEARCH_CACHE_ENABLED: - cache_key = f"author:{text}" - if await search_service.cache.has_query(cache_key): - return await search_service.cache.get_total_count(cache_key) + cache_key = f"author:{text}" + if SEARCH_CACHE_ENABLED and search_service.cache is not None and await search_service.cache.has_query(cache_key): + return await search_service.cache.get_total_count(cache_key) # If not found in cache, fetch from endpoint return len(await search_author_text(text, SEARCH_PREFETCH_SIZE, 0))