core/auth/permissions.py

241 lines
10 KiB
Python
Raw Normal View History

2025-05-16 06:23:48 +00:00
"""
Модуль для проверки разрешений пользователей в контексте сообществ.
Позволяет проверять доступ пользователя к определенным операциям в сообществе
на основе его роли в этом сообществе.
"""
from typing import List, Union
from sqlalchemy.orm import Session
2025-05-29 09:37:39 +00:00
from auth.orm import Author, Permission, Role, RolePermission
2025-05-16 06:23:48 +00:00
from orm.community import Community, CommunityFollower, CommunityRole
2025-05-29 09:37:39 +00:00
from settings import ADMIN_EMAILS as ADMIN_EMAILS_LIST
2025-05-16 06:23:48 +00:00
ADMIN_EMAILS = ADMIN_EMAILS_LIST.split(",")
class ContextualPermissionCheck:
"""
Класс для проверки контекстно-зависимых разрешений.
Позволяет проверять разрешения пользователя в контексте сообщества,
учитывая как глобальные роли пользователя, так и его роли внутри сообщества.
"""
# Маппинг из ролей сообщества в системные роли RBAC
COMMUNITY_ROLE_MAP = {
CommunityRole.READER: "community_reader",
CommunityRole.AUTHOR: "community_author",
CommunityRole.EXPERT: "community_expert",
CommunityRole.EDITOR: "community_editor",
}
# Обратное отображение для отображения системных ролей в роли сообщества
RBAC_TO_COMMUNITY_ROLE = {v: k for k, v in COMMUNITY_ROLE_MAP.items()}
@staticmethod
def check_community_permission(
session: Session, author_id: int, community_slug: str, resource: str, operation: str
) -> bool:
"""
Проверяет наличие разрешения у пользователя в контексте сообщества.
Args:
session: Сессия SQLAlchemy
author_id: ID автора/пользователя
community_slug: Slug сообщества
resource: Ресурс для доступа
operation: Операция над ресурсом
Returns:
bool: True, если пользователь имеет разрешение, иначе False
"""
# 1. Проверка глобальных разрешений (например, администратор)
author = session.query(Author).filter(Author.id == author_id).one_or_none()
if not author:
return False
# Если это администратор (по списку email) или у него есть глобальное разрешение
if author.has_permission(resource, operation) or author.email in ADMIN_EMAILS:
return True
# 2. Проверка разрешений в контексте сообщества
# Получаем информацию о сообществе
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
if not community:
return False
# Если автор является создателем сообщества, то у него есть полные права
if community.created_by == author_id:
return True
# Получаем роли пользователя в этом сообществе
community_follower = (
session.query(CommunityFollower)
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
.one_or_none()
)
if not community_follower or not community_follower.roles:
# Пользователь не является членом сообщества или у него нет ролей
return False
# Преобразуем роли сообщества в RBAC роли
rbac_roles = []
community_roles = community_follower.get_roles()
for role in community_roles:
if role in ContextualPermissionCheck.COMMUNITY_ROLE_MAP:
rbac_role_id = ContextualPermissionCheck.COMMUNITY_ROLE_MAP[role]
rbac_roles.append(rbac_role_id)
if not rbac_roles:
return False
# Проверяем наличие разрешения для этих ролей
permission_id = f"{resource}:{operation}"
# Запрос на проверку разрешений для указанных ролей
has_permission = (
session.query(RolePermission)
.join(Role, Role.id == RolePermission.role)
.join(Permission, Permission.id == RolePermission.permission)
.filter(Role.id.in_(rbac_roles), Permission.id == permission_id)
.first()
is not None
)
return has_permission
@staticmethod
2025-05-29 09:37:39 +00:00
def get_user_community_roles(session: Session, author_id: int, community_slug: str) -> List[CommunityRole]:
2025-05-16 06:23:48 +00:00
"""
Получает список ролей пользователя в сообществе.
Args:
session: Сессия SQLAlchemy
author_id: ID автора/пользователя
community_slug: Slug сообщества
Returns:
List[CommunityRole]: Список ролей пользователя в сообществе
"""
# Получаем информацию о сообществе
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
if not community:
return []
# Если автор является создателем сообщества, то у него есть роль владельца
if community.created_by == author_id:
return [CommunityRole.EDITOR] # Владелец имеет роль редактора по умолчанию
# Получаем роли пользователя в этом сообществе
community_follower = (
session.query(CommunityFollower)
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
.one_or_none()
)
if not community_follower or not community_follower.roles:
return []
return community_follower.get_roles()
@staticmethod
def assign_role_to_user(
session: Session, author_id: int, community_slug: str, role: Union[CommunityRole, str]
) -> bool:
"""
Назначает роль пользователю в сообществе.
Args:
session: Сессия SQLAlchemy
author_id: ID автора/пользователя
community_slug: Slug сообщества
role: Роль для назначения (CommunityRole или строковое представление)
Returns:
bool: True если роль успешно назначена, иначе False
"""
# Преобразуем строковую роль в CommunityRole если нужно
if isinstance(role, str):
try:
role = CommunityRole(role)
except ValueError:
return False
# Получаем информацию о сообществе
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
if not community:
return False
# Проверяем существование связи автор-сообщество
community_follower = (
session.query(CommunityFollower)
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
.one_or_none()
)
if not community_follower:
# Создаем новую запись CommunityFollower
community_follower = CommunityFollower(author=author_id, community=community.id)
session.add(community_follower)
# Назначаем роль
current_roles = community_follower.get_roles() if community_follower.roles else []
if role not in current_roles:
current_roles.append(role)
community_follower.set_roles(current_roles)
session.commit()
return True
@staticmethod
def revoke_role_from_user(
session: Session, author_id: int, community_slug: str, role: Union[CommunityRole, str]
) -> bool:
"""
Отзывает роль у пользователя в сообществе.
Args:
session: Сессия SQLAlchemy
author_id: ID автора/пользователя
community_slug: Slug сообщества
role: Роль для отзыва (CommunityRole или строковое представление)
Returns:
bool: True если роль успешно отозвана, иначе False
"""
# Преобразуем строковую роль в CommunityRole если нужно
if isinstance(role, str):
try:
role = CommunityRole(role)
except ValueError:
return False
# Получаем информацию о сообществе
community = session.query(Community).filter(Community.slug == community_slug).one_or_none()
if not community:
return False
# Проверяем существование связи автор-сообщество
community_follower = (
session.query(CommunityFollower)
.filter(CommunityFollower.author == author_id, CommunityFollower.community == community.id)
.one_or_none()
)
if not community_follower or not community_follower.roles:
return False
# Отзываем роль
current_roles = community_follower.get_roles()
if role in current_roles:
current_roles.remove(role)
community_follower.set_roles(current_roles)
session.commit()
return True