This commit is contained in:
34
tests/auth/test_auth_service.py
Normal file
34
tests/auth/test_auth_service.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import pytest
|
||||
from services.auth import AuthService
|
||||
from services.db import local_session
|
||||
from auth.orm import Author
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ensure_user_has_reader_role():
|
||||
auth_service = AuthService()
|
||||
|
||||
# Создаем тестового пользователя без роли reader
|
||||
with local_session() as session:
|
||||
test_author = Author(
|
||||
email="test_reader_role@example.com",
|
||||
slug="test_reader_role",
|
||||
password="test_password"
|
||||
)
|
||||
session.add(test_author)
|
||||
session.commit()
|
||||
user_id = test_author.id
|
||||
|
||||
# Проверяем, что роль reader добавляется
|
||||
result = await auth_service.ensure_user_has_reader_role(user_id)
|
||||
assert result is True
|
||||
|
||||
# Проверяем, что при повторном вызове возвращается True
|
||||
result = await auth_service.ensure_user_has_reader_role(user_id)
|
||||
assert result is True
|
||||
|
||||
# Очищаем тестовые данные
|
||||
with local_session() as session:
|
||||
test_author = session.query(Author).filter_by(id=user_id).first()
|
||||
if test_author:
|
||||
session.delete(test_author)
|
||||
session.commit()
|
13
tests/auth/test_identity.py
Normal file
13
tests/auth/test_identity.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import pytest
|
||||
from auth.identity import Password
|
||||
|
||||
def test_password_verify():
|
||||
# Создаем пароль
|
||||
original_password = "test_password123"
|
||||
hashed_password = Password.encode(original_password)
|
||||
|
||||
# Проверяем корректный пароль
|
||||
assert Password.verify(original_password, hashed_password) is True
|
||||
|
||||
# Проверяем некорректный пароль
|
||||
assert Password.verify("wrong_password", hashed_password) is False
|
@@ -227,3 +227,51 @@ with (
|
||||
assert created_user is not None
|
||||
assert created_user.name == "Test User"
|
||||
assert created_user.email_verified is True
|
||||
|
||||
# Импортируем необходимые модели
|
||||
from orm.community import Community, CommunityAuthor
|
||||
|
||||
@pytest.fixture
|
||||
def test_community(oauth_db_session, simple_user):
|
||||
"""
|
||||
Создает тестовое сообщество с ожидаемыми ролями по умолчанию
|
||||
|
||||
Args:
|
||||
oauth_db_session: Сессия базы данных для теста
|
||||
simple_user: Пользователь для создания сообщества
|
||||
|
||||
Returns:
|
||||
Community: Созданное тестовое сообщество
|
||||
"""
|
||||
# Очищаем существующие записи
|
||||
oauth_db_session.query(Community).filter(
|
||||
(Community.id == 300) | (Community.slug == "test-oauth-community")
|
||||
).delete()
|
||||
oauth_db_session.commit()
|
||||
|
||||
# Создаем тестовое сообщество
|
||||
community = Community(
|
||||
id=300,
|
||||
name="Test OAuth Community",
|
||||
slug="test-oauth-community",
|
||||
desc="Community for OAuth tests",
|
||||
created_by=simple_user.id,
|
||||
settings={
|
||||
"default_roles": ["reader", "author"],
|
||||
"available_roles": ["reader", "author", "editor"]
|
||||
}
|
||||
)
|
||||
oauth_db_session.add(community)
|
||||
oauth_db_session.commit()
|
||||
|
||||
yield community
|
||||
|
||||
# Очистка после теста
|
||||
try:
|
||||
oauth_db_session.query(CommunityAuthor).filter(
|
||||
CommunityAuthor.community_id == community.id
|
||||
).delete()
|
||||
oauth_db_session.query(Community).filter(Community.id == community.id).delete()
|
||||
oauth_db_session.commit()
|
||||
except Exception:
|
||||
oauth_db_session.rollback()
|
||||
|
@@ -14,38 +14,45 @@ from auth.tokens.storage import TokenStorage
|
||||
async def test_token_storage(redis_client):
|
||||
"""Тест базовой функциональности TokenStorage с правильными fixtures"""
|
||||
|
||||
print("✅ Тестирование TokenStorage...")
|
||||
try:
|
||||
print("✅ Тестирование TokenStorage...")
|
||||
|
||||
# Тест создания сессии
|
||||
print("1. Создание сессии...")
|
||||
token = await TokenStorage.create_session(user_id="test_user_123", username="test_user", device_info={"test": True})
|
||||
print(f" Создан токен: {token[:20]}...")
|
||||
# Тест создания сессии
|
||||
print("1. Создание сессии...")
|
||||
token = await TokenStorage.create_session(user_id="test_user_123", username="test_user", device_info={"test": True})
|
||||
print(f" Создан токен: {token[:20]}...")
|
||||
|
||||
# Тест проверки сессии
|
||||
print("2. Проверка сессии...")
|
||||
session_data = await TokenStorage.verify_session(token)
|
||||
if session_data:
|
||||
print(f" Сессия найдена для user_id: {session_data.user_id}")
|
||||
else:
|
||||
print(" ❌ Сессия не найдена")
|
||||
return False
|
||||
# Тест проверки сессии
|
||||
print("2. Проверка сессии...")
|
||||
session_data = await TokenStorage.verify_session(token)
|
||||
if session_data:
|
||||
print(f" Сессия найдена для user_id: {session_data.user_id}")
|
||||
else:
|
||||
print(" ❌ Сессия не найдена")
|
||||
return False
|
||||
|
||||
# Тест прямого использования SessionTokenManager
|
||||
print("3. Прямое использование SessionTokenManager...")
|
||||
sessions = SessionTokenManager()
|
||||
valid, data = await sessions.validate_session_token(token)
|
||||
print(f" Валидация: {valid}, данные: {bool(data)}")
|
||||
# Тест прямого использования SessionTokenManager
|
||||
print("3. Прямое использование SessionTokenManager...")
|
||||
sessions = SessionTokenManager()
|
||||
valid, data = await sessions.validate_session_token(token)
|
||||
print(f" Валидация: {valid}, данные: {bool(data)}")
|
||||
|
||||
# Тест мониторинга
|
||||
print("4. Мониторинг токенов...")
|
||||
monitoring = TokenMonitoring()
|
||||
stats = await monitoring.get_token_statistics()
|
||||
print(f" Активных сессий: {stats.get('session_tokens', 0)}")
|
||||
# Тест мониторинга
|
||||
print("4. Мониторинг токенов...")
|
||||
monitoring = TokenMonitoring()
|
||||
stats = await monitoring.get_token_statistics()
|
||||
print(f" Активных сессий: {stats.get('session_tokens', 0)}")
|
||||
|
||||
# Очистка
|
||||
print("5. Отзыв сессии...")
|
||||
revoked = await TokenStorage.revoke_session(token)
|
||||
print(f" Отозван: {revoked}")
|
||||
# Очистка
|
||||
print("5. Отзыв сессии...")
|
||||
revoked = await TokenStorage.revoke_session(token)
|
||||
print(f" Отозван: {revoked}")
|
||||
|
||||
print("✅ Все тесты пройдены успешно!")
|
||||
return True
|
||||
print("✅ Все тесты пройдены успешно!")
|
||||
return True
|
||||
finally:
|
||||
# Безопасное закрытие клиента с использованием aclose()
|
||||
if hasattr(redis_client, 'aclose'):
|
||||
await redis_client.aclose()
|
||||
elif hasattr(redis_client, 'close'):
|
||||
await redis_client.close()
|
||||
|
@@ -3,7 +3,7 @@ from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import StaticPool
|
||||
|
||||
from services.db import Base
|
||||
from orm.base import BaseModel as Base
|
||||
from services.redis import redis
|
||||
from tests.test_config import get_test_client
|
||||
|
||||
|
@@ -3,7 +3,7 @@
|
||||
|
||||
Проверяет работу AdminService и AuthService с RBAC системой.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
from auth.orm import Author
|
||||
@@ -11,6 +11,8 @@ from orm.community import Community, CommunityAuthor
|
||||
from services.admin import admin_service
|
||||
from services.auth import auth_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_user(db_session):
|
||||
@@ -36,7 +38,7 @@ def simple_user(db_session):
|
||||
# Очистка после теста
|
||||
try:
|
||||
# Удаляем связанные записи CommunityAuthor
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete()
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
|
||||
# Удаляем самого пользователя
|
||||
db_session.query(Author).filter(Author.id == user.id).delete()
|
||||
db_session.commit()
|
||||
@@ -48,17 +50,18 @@ def simple_user(db_session):
|
||||
def simple_community(db_session, simple_user):
|
||||
"""Создает простое тестовое сообщество"""
|
||||
# Очищаем любые существующие записи с этим ID/slug
|
||||
db_session.query(Community).filter(
|
||||
(Community.id == 200) | (Community.slug == "simple-test-community")
|
||||
).delete()
|
||||
db_session.query(Community).filter(Community.slug == "simple-test-community").delete()
|
||||
db_session.commit()
|
||||
|
||||
community = Community(
|
||||
id=200,
|
||||
name="Simple Test Community",
|
||||
slug="simple-test-community",
|
||||
desc="Simple community for tests",
|
||||
created_by=simple_user.id,
|
||||
settings={
|
||||
"default_roles": ["reader", "author"],
|
||||
"available_roles": ["reader", "author", "editor"]
|
||||
}
|
||||
)
|
||||
db_session.add(community)
|
||||
db_session.commit()
|
||||
@@ -76,6 +79,52 @@ def simple_community(db_session, simple_user):
|
||||
db_session.rollback()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_community(db_session, simple_user):
|
||||
"""
|
||||
Создает тестовое сообщество с ожидаемыми ролями по умолчанию
|
||||
|
||||
Args:
|
||||
db_session: Сессия базы данных для теста
|
||||
simple_user: Пользователь для создания сообщества
|
||||
|
||||
Returns:
|
||||
Community: Созданное тестовое сообщество
|
||||
"""
|
||||
# Очищаем существующие записи
|
||||
db_session.query(Community).filter(Community.slug == "test-rbac-community").delete()
|
||||
db_session.commit()
|
||||
|
||||
community = Community(
|
||||
name="Test RBAC Community",
|
||||
slug="test-rbac-community",
|
||||
desc="Community for RBAC tests",
|
||||
created_by=simple_user.id,
|
||||
settings={
|
||||
"default_roles": ["reader", "author"],
|
||||
"available_roles": ["reader", "author", "editor"]
|
||||
}
|
||||
)
|
||||
db_session.add(community)
|
||||
db_session.flush() # Получаем ID без коммита
|
||||
|
||||
logger.info(f"DEBUG: Создание Community с айди {community.id}")
|
||||
|
||||
db_session.commit()
|
||||
|
||||
yield community
|
||||
|
||||
# Очистка после теста
|
||||
try:
|
||||
# Удаляем связанные записи CommunityAuthor
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.community_id == community.id).delete()
|
||||
# Удаляем сообщество
|
||||
db_session.query(Community).filter(Community.id == community.id).delete()
|
||||
db_session.commit()
|
||||
except Exception:
|
||||
db_session.rollback()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_test_users(db_session):
|
||||
"""Автоматически очищает тестовые записи пользователей перед каждым тестом"""
|
||||
@@ -96,7 +145,7 @@ def cleanup_test_users(db_session):
|
||||
existing_user = db_session.query(Author).filter(Author.email == email).first()
|
||||
if existing_user:
|
||||
# Удаляем связанные записи CommunityAuthor
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing_user.id).delete()
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing_user.id).delete(synchronize_session=False)
|
||||
# Удаляем пользователя
|
||||
db_session.delete(existing_user)
|
||||
db_session.commit()
|
||||
@@ -154,70 +203,101 @@ class TestSimpleAdminService:
|
||||
# Может быть пустой список или содержать системную роль админа
|
||||
assert len(roles) >= 0
|
||||
|
||||
def test_get_user_roles_with_roles(self, db_session, simple_user, simple_community):
|
||||
def test_get_user_roles_with_roles(self, db_session, simple_user, test_community):
|
||||
"""Тест получения ролей пользователя"""
|
||||
# Используем дефолтное сообщество (ID=1) для совместимости с AdminService
|
||||
default_community_id = 1
|
||||
# Используем тестовое сообщество
|
||||
community_id = test_community.id
|
||||
|
||||
print(f"DEBUG: user_id={simple_user.id}, community_id={default_community_id}")
|
||||
# Отладочная информация о тестовом сообществе
|
||||
logger.info(f"DEBUG: Тестовое сообщество ID: {community_id}")
|
||||
logger.info(f"DEBUG: Тестовое сообщество slug: {test_community.slug}")
|
||||
logger.info(f"DEBUG: Тестовое сообщество settings: {test_community.settings}")
|
||||
|
||||
# Полностью очищаем все существующие CommunityAuthor для пользователя
|
||||
existing_community_authors = db_session.query(CommunityAuthor).filter(
|
||||
CommunityAuthor.author_id == simple_user.id
|
||||
).all()
|
||||
|
||||
# Отладочная информация
|
||||
logger.info(f"DEBUG: Найдено существующих CommunityAuthor: {len(existing_community_authors)}")
|
||||
for ca in existing_community_authors:
|
||||
logger.info(f"DEBUG: Существующий CA - community_id: {ca.community_id}, roles: {ca.roles}")
|
||||
db_session.delete(ca)
|
||||
|
||||
# Очищаем существующие роли
|
||||
deleted_count = db_session.query(CommunityAuthor).filter(
|
||||
CommunityAuthor.author_id == simple_user.id,
|
||||
CommunityAuthor.community_id == default_community_id
|
||||
).delete()
|
||||
db_session.commit()
|
||||
print(f"DEBUG: Удалено записей CommunityAuthor: {deleted_count}")
|
||||
|
||||
# Создаем CommunityAuthor с ролями в дефолтном сообществе
|
||||
# Создаем CommunityAuthor с ролями в тестовом сообществе
|
||||
ca = CommunityAuthor(
|
||||
community_id=default_community_id,
|
||||
community_id=community_id,
|
||||
author_id=simple_user.id,
|
||||
)
|
||||
|
||||
# Расширенная отладка перед set_roles
|
||||
logger.info(f"DEBUG: Перед set_roles")
|
||||
logger.info(f"DEBUG: ca.roles до set_roles: {ca.roles}")
|
||||
logger.info(f"DEBUG: ca.role_list до set_roles: {ca.role_list}")
|
||||
|
||||
ca.set_roles(["reader", "author"])
|
||||
print(f"DEBUG: Установлены роли: {ca.role_list}")
|
||||
|
||||
# Расширенная отладка после set_roles
|
||||
logger.info(f"DEBUG: После set_roles")
|
||||
logger.info(f"DEBUG: ca.roles после set_roles: {ca.roles}")
|
||||
logger.info(f"DEBUG: ca.role_list после set_roles: {ca.role_list}")
|
||||
|
||||
db_session.add(ca)
|
||||
db_session.commit()
|
||||
print(f"DEBUG: CA сохранен в БД с ID: {ca.id}")
|
||||
|
||||
# Проверяем что роли сохранились в БД
|
||||
saved_ca = db_session.query(CommunityAuthor).filter(
|
||||
# Явная проверка сохранения CommunityAuthor
|
||||
check_ca = db_session.query(CommunityAuthor).filter(
|
||||
CommunityAuthor.author_id == simple_user.id,
|
||||
CommunityAuthor.community_id == default_community_id
|
||||
CommunityAuthor.community_id == community_id
|
||||
).first()
|
||||
assert saved_ca is not None
|
||||
print(f"DEBUG: Сохраненные роли в БД: {saved_ca.role_list}")
|
||||
assert "reader" in saved_ca.role_list
|
||||
assert "author" in saved_ca.role_list
|
||||
|
||||
# Проверяем роли через AdminService (использует дефолтное сообщество)
|
||||
logger.info(f"DEBUG: Проверка сохраненной записи CommunityAuthor")
|
||||
logger.info(f"DEBUG: Найденная запись: {check_ca}")
|
||||
logger.info(f"DEBUG: Роли в найденной записи: {check_ca.roles}")
|
||||
logger.info(f"DEBUG: role_list найденной записи: {check_ca.role_list}")
|
||||
|
||||
assert check_ca is not None, "CommunityAuthor должен быть сохранен в базе данных"
|
||||
assert check_ca.roles is not None, "Роли CommunityAuthor не должны быть None"
|
||||
assert "reader" in check_ca.role_list, "Роль 'reader' должна быть в role_list"
|
||||
assert "author" in check_ca.role_list, "Роль 'author' должна быть в role_list"
|
||||
|
||||
# Проверяем роли через AdminService
|
||||
from services.admin import admin_service
|
||||
from services.db import local_session
|
||||
|
||||
# Используем ту же сессию для проверки
|
||||
fresh_user = db_session.query(Author).filter(Author.id == simple_user.id).first()
|
||||
roles = admin_service.get_user_roles(fresh_user) # Без указания community_id - использует дефолт
|
||||
print(f"DEBUG: AdminService вернул роли: {roles}")
|
||||
assert "reader" in roles
|
||||
assert "author" in roles
|
||||
roles = admin_service.get_user_roles(fresh_user, community_id)
|
||||
|
||||
# Проверяем роли
|
||||
assert isinstance(roles, list), "Роли должны быть списком"
|
||||
assert "reader" in roles, "Роль 'reader' должна присутствовать"
|
||||
assert "author" in roles, "Роль 'author' должна присутствовать"
|
||||
assert len(roles) == 2, f"Должно быть 2 роли, а не {len(roles)}"
|
||||
|
||||
def test_update_user_success(self, db_session, simple_user):
|
||||
"""Тест успешного обновления пользователя"""
|
||||
original_name = simple_user.name
|
||||
from services.admin import admin_service
|
||||
|
||||
user_data = {
|
||||
# Обновляем пользователя
|
||||
result = admin_service.update_user({
|
||||
"id": simple_user.id,
|
||||
"email": simple_user.email,
|
||||
"name": "Updated Name",
|
||||
"roles": ["reader"]
|
||||
}
|
||||
"email": simple_user.email
|
||||
})
|
||||
|
||||
result = admin_service.update_user(user_data)
|
||||
assert result["success"] is True
|
||||
# Проверяем обновленного пользователя
|
||||
assert result is not None, "Пользователь должен быть обновлен"
|
||||
assert result.get("name") == "Updated Name", "Имя пользователя должно быть обновлено"
|
||||
|
||||
# Получаем обновленного пользователя из БД заново
|
||||
updated_user = db_session.query(Author).filter(Author.id == simple_user.id).first()
|
||||
assert updated_user.name == "Updated Name"
|
||||
|
||||
# Восстанавливаем исходное имя для других тестов
|
||||
updated_user.name = original_name
|
||||
db_session.commit()
|
||||
# Восстанавливаем исходное имя
|
||||
admin_service.update_user({
|
||||
"id": simple_user.id,
|
||||
"name": "Simple User",
|
||||
"email": simple_user.email
|
||||
})
|
||||
|
||||
|
||||
class TestSimpleAuthService:
|
||||
@@ -227,12 +307,15 @@ class TestSimpleAuthService:
|
||||
"""Тест базового создания пользователя"""
|
||||
test_email = "test_create_unique@example.com"
|
||||
|
||||
# Удаляем пользователя если существует
|
||||
existing = db_session.query(Author).filter(Author.email == test_email).first()
|
||||
if existing:
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing.id).delete()
|
||||
db_session.delete(existing)
|
||||
db_session.commit()
|
||||
# Найдем существующих пользователей с таким email
|
||||
existing_users = db_session.query(Author).filter(Author.email == test_email).all()
|
||||
|
||||
# Удаляем связанные записи CommunityAuthor для существующих пользователей
|
||||
for user in existing_users:
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
|
||||
db_session.delete(user)
|
||||
|
||||
db_session.commit()
|
||||
|
||||
user_dict = {
|
||||
"email": test_email,
|
||||
@@ -247,37 +330,102 @@ class TestSimpleAuthService:
|
||||
assert user.name == "Test Create User"
|
||||
|
||||
# Очистка
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete()
|
||||
db_session.delete(user)
|
||||
db_session.commit()
|
||||
|
||||
def test_create_user_with_community(self, db_session, simple_community):
|
||||
"""Тест создания пользователя с привязкой к сообществу"""
|
||||
test_email = "test_community_unique@example.com"
|
||||
|
||||
# Удаляем пользователя если существует
|
||||
existing = db_session.query(Author).filter(Author.email == test_email).first()
|
||||
if existing:
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == existing.id).delete()
|
||||
db_session.delete(existing)
|
||||
try:
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete(synchronize_session=False)
|
||||
db_session.delete(user)
|
||||
db_session.commit()
|
||||
except Exception as e:
|
||||
# Если возникла ошибка при удалении, просто логируем ее
|
||||
print(f"Ошибка при очистке: {e}")
|
||||
db_session.rollback()
|
||||
|
||||
def test_create_user_with_community(self, db_session):
|
||||
"""Проверяем создание пользователя в конкретном сообществе"""
|
||||
from services.auth import auth_service
|
||||
from services.rbac import initialize_community_permissions
|
||||
from auth.orm import Author
|
||||
import asyncio
|
||||
import uuid
|
||||
|
||||
# Создаем тестового пользователя
|
||||
system_author = db_session.query(Author).filter(Author.slug == "system").first()
|
||||
if not system_author:
|
||||
system_author = Author(
|
||||
name="System",
|
||||
slug="system",
|
||||
email="system@test.local"
|
||||
)
|
||||
db_session.add(system_author)
|
||||
db_session.flush()
|
||||
|
||||
# Создаем тестовое сообщество
|
||||
unique_slug = f"simple-test-community-{uuid.uuid4()}"
|
||||
community = Community(
|
||||
name="Simple Test Community",
|
||||
slug=unique_slug,
|
||||
desc="Simple community for tests",
|
||||
created_by=system_author.id,
|
||||
settings={
|
||||
"default_roles": ["reader", "author"],
|
||||
"available_roles": ["reader", "author", "editor"]
|
||||
}
|
||||
)
|
||||
db_session.add(community)
|
||||
db_session.flush()
|
||||
|
||||
# Инициализируем права сообщества
|
||||
async def init_community_permissions():
|
||||
await initialize_community_permissions(community.id)
|
||||
|
||||
# Запускаем инициализацию в текущем event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(init_community_permissions())
|
||||
|
||||
# Генерируем уникальные данные для каждого теста
|
||||
unique_email = f"test_community_unique_{uuid.uuid4()}@example.com"
|
||||
unique_name = f"Test Community User {uuid.uuid4()}"
|
||||
unique_slug = f"test-community-user-{uuid.uuid4()}"
|
||||
|
||||
user_dict = {
|
||||
"email": test_email,
|
||||
"name": "Test Community User",
|
||||
"slug": "test-community-user-unique",
|
||||
"name": unique_name,
|
||||
"email": unique_email,
|
||||
"slug": unique_slug
|
||||
}
|
||||
|
||||
user = auth_service.create_user(user_dict, community_id=simple_community.id)
|
||||
# Создаем пользователя в конкретном сообществе
|
||||
user = auth_service.create_user(user_dict, community_id=community.id)
|
||||
|
||||
assert user is not None
|
||||
assert user.email == test_email
|
||||
# Проверяем созданного пользователя
|
||||
assert user is not None, "Пользователь должен быть создан"
|
||||
assert user.email == unique_email.lower(), "Email должен быть в нижнем регистре"
|
||||
assert user.name == unique_name, "Имя пользователя должно совпадать"
|
||||
assert user.slug == unique_slug, "Slug пользователя должен совпадать"
|
||||
|
||||
# Очистка
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete()
|
||||
db_session.delete(user)
|
||||
# Проверяем роли
|
||||
from orm.community import get_user_roles_in_community
|
||||
|
||||
# Получаем роли
|
||||
roles = get_user_roles_in_community(user.id, community_id=community.id)
|
||||
|
||||
# Проверяем роли
|
||||
assert "reader" in roles, f"У нового пользователя должна быть роль 'reader' в сообществе {community.id}. Текущие роли: {roles}"
|
||||
assert "author" in roles, f"У нового пользователя должна быть роль 'author' в сообществе {community.id}. Текущие роли: {roles}"
|
||||
|
||||
# Коммитим изменения
|
||||
db_session.commit()
|
||||
|
||||
# Очищаем созданные объекты
|
||||
try:
|
||||
# Удаляем связанные записи CommunityAuthor
|
||||
db_session.query(CommunityAuthor).filter(CommunityAuthor.author_id == user.id).delete()
|
||||
# Удаляем пользователя
|
||||
db_session.query(Author).filter(Author.id == user.id).delete()
|
||||
# Удаляем сообщество
|
||||
db_session.query(Community).filter(Community.id == community.id).delete()
|
||||
db_session.commit()
|
||||
except Exception:
|
||||
db_session.rollback()
|
||||
|
||||
|
||||
class TestCommunityAuthorMethods:
|
||||
"""Тесты методов CommunityAuthor"""
|
||||
|
Reference in New Issue
Block a user