Files
core/orm/community.py

824 lines
31 KiB
Python
Raw Normal View History

2025-07-31 18:55:59 +03:00
import asyncio
2024-10-21 11:29:57 +03:00
import time
2025-07-02 22:30:21 +03:00
from typing import Any, Dict
2023-10-23 17:47:11 +03:00
2025-07-31 18:55:59 +03:00
from sqlalchemy import (
JSON,
Boolean,
ForeignKey,
Index,
Integer,
PrimaryKeyConstraint,
String,
UniqueConstraint,
distinct,
func,
)
2024-10-21 11:48:51 +03:00
from sqlalchemy.ext.hybrid import hybrid_property
2025-07-31 18:55:59 +03:00
from sqlalchemy.orm import Mapped, mapped_column
2024-10-21 11:48:51 +03:00
2025-05-16 09:23:48 +03:00
from auth.orm import Author
2025-07-25 01:04:15 +03:00
from orm.base import BaseModel
2025-07-31 18:55:59 +03:00
from orm.shout import Shout
from services.db import local_session
from services.rbac import (
get_permissions_for_role,
initialize_community_permissions,
user_has_permission,
)
2025-07-02 22:30:21 +03:00
# Словарь названий ролей
role_names = {
"reader": "Читатель",
"author": "Автор",
"artist": "Художник",
"expert": "Эксперт",
"editor": "Редактор",
"admin": "Администратор",
}
# Словарь описаний ролей
role_descriptions = {
"reader": "Может читать и комментировать",
"author": "Может создавать публикации",
"artist": "Может быть credited artist",
"expert": "Может добавлять доказательства",
"editor": "Может модерировать контент",
"admin": "Полные права",
}
2023-10-23 17:47:11 +03:00
2021-08-27 00:14:20 +03:00
2025-07-02 22:30:21 +03:00
class CommunityFollower(BaseModel):
"""
Простая подписка пользователя на сообщество.
2024-10-21 11:29:57 +03:00
2025-07-02 22:30:21 +03:00
Использует обычный id как первичный ключ для простоты и производительности.
Уникальность обеспечивается индексом по (community, follower).
"""
__tablename__ = "community_follower"
2025-07-31 18:55:59 +03:00
community: Mapped[int] = mapped_column(Integer, ForeignKey("community.id"), nullable=False, index=True)
follower: Mapped[int] = mapped_column(Integer, ForeignKey(Author.id), nullable=False, index=True)
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
2025-07-02 22:30:21 +03:00
# Уникальность по паре сообщество-подписчик
__table_args__ = (
2025-07-31 18:55:59 +03:00
PrimaryKeyConstraint("community", "follower"),
2025-07-02 22:30:21 +03:00
{"extend_existing": True},
)
2024-10-21 11:48:51 +03:00
2025-07-02 22:30:21 +03:00
def __init__(self, community: int, follower: int) -> None:
2025-07-31 18:55:59 +03:00
self.community = community
self.follower = follower
2021-08-27 00:14:20 +03:00
class Community(BaseModel):
2024-04-17 18:32:23 +03:00
__tablename__ = "community"
2022-09-03 13:50:14 +03:00
2025-07-31 18:55:59 +03:00
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String, nullable=False)
slug: Mapped[str] = mapped_column(String, nullable=False, unique=True)
desc: Mapped[str] = mapped_column(String, nullable=False, default="")
pic: Mapped[str | None] = mapped_column(String, nullable=False, default="")
created_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
created_by: Mapped[int | None] = mapped_column(Integer, nullable=True)
settings: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True)
updated_at: Mapped[int | None] = mapped_column(Integer, nullable=True)
deleted_at: Mapped[int | None] = mapped_column(Integer, nullable=True)
private: Mapped[bool] = mapped_column(Boolean, default=False)
2024-10-21 10:52:23 +03:00
2024-10-21 11:08:16 +03:00
@hybrid_property
def stat(self):
return CommunityStats(self)
2024-10-21 10:52:23 +03:00
def is_followed_by(self, author_id: int) -> bool:
2025-07-02 22:30:21 +03:00
"""Проверяет, подписан ли пользователь на сообщество"""
with local_session() as session:
follower = (
session.query(CommunityFollower)
2025-07-31 18:55:59 +03:00
.where(CommunityFollower.community == self.id, CommunityFollower.follower == author_id)
.first()
)
return follower is not None
2025-07-02 22:30:21 +03:00
def get_user_roles(self, user_id: int) -> list[str]:
"""
Получает роли пользователя в данном сообществе через CommunityAuthor
Args:
user_id: ID пользователя
Returns:
Список ролей пользователя в сообществе
"""
with local_session() as session:
2025-07-02 22:30:21 +03:00
community_author = (
session.query(CommunityAuthor)
2025-07-31 18:55:59 +03:00
.where(CommunityAuthor.community_id == self.id, CommunityAuthor.author_id == user_id)
2025-07-02 22:30:21 +03:00
.first()
)
return community_author.role_list if community_author else []
def has_user_role(self, user_id: int, role_id: str) -> bool:
"""
Проверяет, есть ли у пользователя указанная роль в этом сообществе
Args:
user_id: ID пользователя
role_id: ID роли
Returns:
True если роль есть, False если нет
"""
user_roles = self.get_user_roles(user_id)
return role_id in user_roles
def add_user_role(self, user_id: int, role: str) -> None:
"""
Добавляет роль пользователю в сообществе
Args:
user_id: ID пользователя
role: Название роли
"""
with local_session() as session:
# Ищем существующую запись
community_author = (
session.query(CommunityAuthor)
2025-07-31 18:55:59 +03:00
.where(CommunityAuthor.community_id == self.id, CommunityAuthor.author_id == user_id)
2025-07-02 22:30:21 +03:00
.first()
)
if community_author:
# Добавляем роль к существующей записи
community_author.add_role(role)
else:
# Создаем новую запись
community_author = CommunityAuthor(community_id=self.id, author_id=user_id, roles=role)
session.add(community_author)
session.commit()
def remove_user_role(self, user_id: int, role: str) -> None:
"""
Удаляет роль у пользователя в сообществе
Args:
user_id: ID пользователя
role: Название роли
"""
with local_session() as session:
community_author = (
session.query(CommunityAuthor)
2025-07-31 18:55:59 +03:00
.where(CommunityAuthor.community_id == self.id, CommunityAuthor.author_id == user_id)
2025-07-02 22:30:21 +03:00
.first()
)
if community_author:
community_author.remove_role(role)
# Если ролей не осталось, удаляем запись
if not community_author.role_list:
session.delete(community_author)
session.commit()
def set_user_roles(self, user_id: int, roles: list[str]) -> None:
"""
Устанавливает полный список ролей пользователя в сообществе
Args:
user_id: ID пользователя
roles: Список ролей для установки
"""
with local_session() as session:
# Ищем существующую запись
community_author = (
session.query(CommunityAuthor)
2025-07-31 18:55:59 +03:00
.where(CommunityAuthor.community_id == self.id, CommunityAuthor.author_id == user_id)
.first()
)
2025-07-02 22:30:21 +03:00
if community_author:
if roles:
# Обновляем роли
community_author.set_roles(roles)
else:
# Если ролей нет, удаляем запись
session.delete(community_author)
elif roles:
# Создаем новую запись, если есть роли
community_author = CommunityAuthor(community_id=self.id, author_id=user_id)
community_author.set_roles(roles)
session.add(community_author)
session.commit()
def get_community_members(self, with_roles: bool = False) -> list[dict[str, Any]]:
"""
Получает список участников сообщества
Args:
with_roles: Если True, включает информацию о ролях
Returns:
Список участников с информацией о ролях
"""
with local_session() as session:
2025-07-31 18:55:59 +03:00
community_authors = session.query(CommunityAuthor).where(CommunityAuthor.community_id == self.id).all()
2025-07-02 22:30:21 +03:00
members = []
for ca in community_authors:
member_info = {
"author_id": ca.author_id,
"joined_at": ca.joined_at,
}
if with_roles:
member_info["roles"] = ca.role_list # type: ignore[assignment]
2025-07-02 22:49:20 +03:00
# Получаем разрешения синхронно
try:
member_info["permissions"] = asyncio.run(ca.get_permissions()) # type: ignore[assignment]
except Exception:
# Если не удается получить разрешения асинхронно, используем пустой список
member_info["permissions"] = [] # type: ignore[assignment]
2025-07-02 22:30:21 +03:00
members.append(member_info)
return members
def assign_default_roles_to_user(self, user_id: int) -> None:
"""
Назначает дефолтные роли новому пользователю в сообществе
Args:
user_id: ID пользователя
"""
default_roles = self.get_default_roles()
self.set_user_roles(user_id, default_roles)
def get_default_roles(self) -> list[str]:
"""
Получает список дефолтных ролей для новых пользователей в сообществе
Returns:
Список ID ролей, которые назначаются новым пользователям по умолчанию
"""
if not self.settings:
return ["reader", "author"] # По умолчанию базовые роли
return self.settings.get("default_roles", ["reader", "author"])
def set_default_roles(self, roles: list[str]) -> None:
"""
Устанавливает дефолтные роли для новых пользователей в сообществе
Args:
roles: Список ID ролей для назначения по умолчанию
"""
if not self.settings:
self.settings = {} # type: ignore[assignment]
self.settings["default_roles"] = roles # type: ignore[index]
async def initialize_role_permissions(self) -> None:
"""
Инициализирует права ролей для сообщества из дефолтных настроек.
Вызывается при создании нового сообщества.
"""
await initialize_community_permissions(int(self.id))
def get_available_roles(self) -> list[str]:
"""
Получает список доступных ролей в сообществе
Returns:
Список ID ролей, которые могут быть назначены в этом сообществе
"""
if not self.settings:
return ["reader", "author", "artist", "expert", "editor", "admin"] # Все стандартные роли
return self.settings.get("available_roles", ["reader", "author", "artist", "expert", "editor", "admin"])
def set_available_roles(self, roles: list[str]) -> None:
"""
Устанавливает список доступных ролей в сообществе
Args:
roles: Список ID ролей, доступных в сообществе
"""
if not self.settings:
self.settings = {} # type: ignore[assignment]
self.settings["available_roles"] = roles # type: ignore[index]
def set_slug(self, slug: str) -> None:
"""Устанавливает slug сообщества"""
self.slug = slug # type: ignore[assignment]
2024-10-21 16:59:25 +03:00
2025-07-31 18:55:59 +03:00
def get_followers(self):
"""
Получает список подписчиков сообщества.
Returns:
list: Список ID авторов, подписанных на сообщество
"""
with local_session() as session:
return [
follower.id
for follower in session.query(Author)
.join(CommunityFollower, Author.id == CommunityFollower.follower)
.where(CommunityFollower.community == self.id)
.all()
]
def add_community_creator(self, author_id: int) -> None:
"""
Создатель сообщества
Args:
author_id: ID пользователя, которому назначаются права
"""
with local_session() as session:
# Проверяем существование связи
existing = CommunityAuthor.find_author_in_community(author_id, self.id, session)
if not existing:
# Создаем нового CommunityAuthor с ролью редактора
community_author = CommunityAuthor(community_id=self.id, author_id=author_id, roles="editor")
session.add(community_author)
session.commit()
2024-10-21 10:52:23 +03:00
2024-10-21 11:08:16 +03:00
class CommunityStats:
def __init__(self, community) -> None:
2024-10-21 11:08:16 +03:00
self.community = community
2024-10-21 10:52:23 +03:00
2024-10-21 11:08:16 +03:00
@property
2025-07-31 18:55:59 +03:00
def shouts(self) -> int:
return self.community.session.query(func.count(Shout.id)).where(Shout.community == self.community.id).scalar()
2024-10-21 11:08:16 +03:00
@property
2025-07-31 18:55:59 +03:00
def followers(self) -> int:
2024-10-21 11:29:57 +03:00
return (
self.community.session.query(func.count(CommunityFollower.follower))
2025-07-31 18:55:59 +03:00
.where(CommunityFollower.community == self.community.id)
2024-10-21 11:08:16 +03:00
.scalar()
2024-10-21 11:29:57 +03:00
)
2024-10-21 16:42:30 +03:00
@property
2025-07-31 18:55:59 +03:00
def authors(self) -> int:
2024-10-21 16:57:03 +03:00
# author has a shout with community id and its featured_at is not null
2024-10-21 16:42:30 +03:00
return (
self.community.session.query(func.count(distinct(Author.id)))
.join(Shout)
2025-07-31 18:55:59 +03:00
.where(
2025-05-16 09:23:48 +03:00
Shout.community == self.community.id,
Shout.featured_at.is_not(None),
Author.id.in_(Shout.authors),
)
2024-10-21 16:42:30 +03:00
.scalar()
)
2024-10-21 16:59:25 +03:00
class CommunityAuthor(BaseModel):
2025-07-02 22:30:21 +03:00
"""
Связь автора с сообществом и его ролями.
Attributes:
id: Уникальный ID записи
community_id: ID сообщества
author_id: ID автора
roles: CSV строка с ролями (например: "reader,author,editor")
joined_at: Время присоединения к сообществу (unix timestamp)
"""
2024-10-21 16:59:25 +03:00
__tablename__ = "community_author"
2025-07-31 18:55:59 +03:00
id: Mapped[int] = mapped_column(Integer, primary_key=True)
community_id: Mapped[int] = mapped_column(Integer, ForeignKey("community.id"), nullable=False)
author_id: Mapped[int] = mapped_column(Integer, ForeignKey(Author.id), nullable=False)
roles: Mapped[str | None] = mapped_column(String, nullable=True, comment="Roles (comma-separated)")
joined_at: Mapped[int] = mapped_column(Integer, nullable=False, default=lambda: int(time.time()))
2025-07-02 22:30:21 +03:00
# Уникальность по сообществу и автору
__table_args__ = (
Index("idx_community_author_community", "community_id"),
Index("idx_community_author_author", "author_id"),
UniqueConstraint("community_id", "author_id", name="uq_community_author"),
{"extend_existing": True},
)
2024-10-21 16:59:25 +03:00
@property
2025-07-02 22:30:21 +03:00
def role_list(self) -> list[str]:
"""Получает список ролей как список строк"""
return [role.strip() for role in self.roles.split(",") if role.strip()] if self.roles else []
2024-10-21 16:59:25 +03:00
@role_list.setter
2025-07-02 22:30:21 +03:00
def role_list(self, value: list[str]) -> None:
"""Устанавливает список ролей из списка строк"""
self.roles = ",".join(value) if value else None # type: ignore[assignment]
2025-07-02 22:30:21 +03:00
2025-07-31 18:55:59 +03:00
def add_role(self, role: str) -> None:
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
Добавляет роль в список ролей.
2025-07-02 22:30:21 +03:00
Args:
2025-07-31 18:55:59 +03:00
role (str): Название роли
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
if not self.roles:
self.roles = role
elif role not in self.role_list:
self.roles += f",{role}"
2025-07-02 22:30:21 +03:00
2025-07-31 18:55:59 +03:00
def remove_role(self, role: str) -> None:
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
Удаляет роль из списка ролей.
2025-07-02 22:30:21 +03:00
Args:
2025-07-31 18:55:59 +03:00
role (str): Название роли
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
if self.roles and role in self.role_list:
roles_list = [r for r in self.role_list if r != role]
self.roles = ",".join(roles_list) if roles_list else None
2025-07-02 22:30:21 +03:00
2025-07-31 18:55:59 +03:00
def has_role(self, role: str) -> bool:
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
Проверяет наличие роли.
2025-07-02 22:30:21 +03:00
Args:
2025-07-31 18:55:59 +03:00
role (str): Название роли
Returns:
bool: True, если роль есть, иначе False
2025-07-02 22:30:21 +03:00
"""
2025-07-31 18:55:59 +03:00
return bool(self.roles and role in self.role_list)
2025-07-02 22:30:21 +03:00
def set_roles(self, roles: list[str]) -> None:
"""
2025-07-25 01:04:15 +03:00
Устанавливает роли для CommunityAuthor.
2025-07-02 22:30:21 +03:00
Args:
roles: Список ролей для установки
"""
2025-07-25 01:04:15 +03:00
# Фильтруем и очищаем роли
valid_roles = [role.strip() for role in roles if role and role.strip()]
2025-07-31 18:55:59 +03:00
# Если список пустой, устанавливаем пустую строку
2025-07-25 01:04:15 +03:00
self.roles = ",".join(valid_roles) if valid_roles else ""
2025-07-02 22:30:21 +03:00
async def get_permissions(self) -> list[str]:
"""
Получает все разрешения автора на основе его ролей в конкретном сообществе
Returns:
Список разрешений (permissions)
"""
all_permissions = set()
for role in self.role_list:
role_perms = await get_permissions_for_role(role, int(self.community_id))
all_permissions.update(role_perms)
return list(all_permissions)
2025-07-31 18:55:59 +03:00
def has_permission(
self, permission: str | None = None, resource: str | None = None, operation: str | None = None
) -> bool:
2025-07-02 22:30:21 +03:00
"""
Проверяет наличие разрешения у автора
Args:
permission: Разрешение для проверки (например: "shout:create")
2025-07-31 18:55:59 +03:00
resource: Опциональный ресурс (для обратной совместимости)
operation: Опциональная операция (для обратной совместимости)
2025-07-02 22:30:21 +03:00
Returns:
True если разрешение есть, False если нет
"""
2025-07-31 18:55:59 +03:00
# Если передан полный permission, используем его
if permission and ":" in permission:
return any(permission == role for role in self.role_list)
# Если переданы resource и operation, формируем permission
if resource and operation:
full_permission = f"{resource}:{operation}"
return any(full_permission == role for role in self.role_list)
return False
2025-07-02 22:30:21 +03:00
def dict(self, access: bool = False) -> dict[str, Any]:
"""
Сериализует объект в словарь
Args:
access: Если True, включает дополнительную информацию
Returns:
Словарь с данными объекта
"""
result = {
"id": self.id,
"community_id": self.community_id,
"author_id": self.author_id,
"roles": self.role_list,
"joined_at": self.joined_at,
}
if access:
# Note: permissions должны быть получены заранее через await
# Здесь мы не можем использовать await в sync методе
result["permissions"] = [] # Placeholder - нужно получить асинхронно
return result
@classmethod
def get_user_communities_with_roles(cls, author_id: int, session=None) -> list[Dict[str, Any]]:
"""
Получает все сообщества пользователя с его ролями
Args:
author_id: ID автора
session: Сессия БД (опционально)
Returns:
Список словарей с информацией о сообществах и ролях
"""
if session is None:
with local_session() as ssession:
return cls.get_user_communities_with_roles(author_id, ssession)
2025-07-31 18:55:59 +03:00
community_authors = session.query(cls).where(cls.author_id == author_id).all()
2025-07-02 22:30:21 +03:00
return [
{
"community_id": ca.community_id,
"roles": ca.role_list,
"permissions": [], # Нужно получить асинхронно
"joined_at": ca.joined_at,
}
for ca in community_authors
]
@classmethod
2025-07-31 18:55:59 +03:00
def find_author_in_community(cls, author_id: int, community_id: int, session=None) -> "CommunityAuthor | None":
2025-07-02 22:30:21 +03:00
"""
Находит запись CommunityAuthor по ID автора и сообщества
Args:
author_id: ID автора
community_id: ID сообщества
session: Сессия БД (опционально)
Returns:
CommunityAuthor или None
"""
if session is None:
with local_session() as ssession:
2025-07-31 18:55:59 +03:00
return ssession.query(cls).where(cls.author_id == author_id, cls.community_id == community_id).first()
2025-07-02 22:30:21 +03:00
2025-07-31 18:55:59 +03:00
return session.query(cls).where(cls.author_id == author_id, cls.community_id == community_id).first()
2025-07-02 22:30:21 +03:00
@classmethod
def get_users_with_role(cls, community_id: int, role: str, session=None) -> list[int]:
"""
Получает список ID пользователей с указанной ролью в сообществе
Args:
community_id: ID сообщества
role: Название роли
session: Сессия БД (опционально)
Returns:
Список ID пользователей
"""
if session is None:
with local_session() as ssession:
return cls.get_users_with_role(community_id, role, ssession)
2025-07-31 18:55:59 +03:00
community_authors = session.query(cls).where(cls.community_id == community_id).all()
2025-07-02 22:30:21 +03:00
return [ca.author_id for ca in community_authors if ca.has_role(role)]
@classmethod
def get_community_stats(cls, community_id: int, session=None) -> Dict[str, Any]:
"""
Получает статистику ролей в сообществе
Args:
community_id: ID сообщества
session: Сессия БД (опционально)
Returns:
Словарь со статистикой ролей
"""
if session is None:
with local_session() as s:
return cls.get_community_stats(community_id, s)
2025-07-31 18:55:59 +03:00
community_authors = session.query(cls).where(cls.community_id == community_id).all()
2025-07-02 22:30:21 +03:00
role_counts: dict[str, int] = {}
total_members = len(community_authors)
for ca in community_authors:
for role in ca.role_list:
role_counts[role] = role_counts.get(role, 0) + 1
return {
"total_members": total_members,
"role_counts": role_counts,
"roles_distribution": {
role: count / total_members if total_members > 0 else 0 for role, count in role_counts.items()
},
}
# === HELPER ФУНКЦИИ ДЛЯ РАБОТЫ С РОЛЯМИ ===
def get_user_roles_in_community(author_id: int, community_id: int = 1) -> list[str]:
"""
Удобная функция для получения ролей пользователя в сообществе
Args:
author_id: ID автора
community_id: ID сообщества (по умолчанию 1)
Returns:
Список ролей пользователя
"""
with local_session() as session:
2025-07-31 18:55:59 +03:00
ca = CommunityAuthor.find_author_in_community(author_id, community_id, session)
2025-07-02 22:30:21 +03:00
return ca.role_list if ca else []
async def check_user_permission_in_community(author_id: int, permission: str, community_id: int = 1) -> bool:
"""
Проверяет разрешение пользователя в сообществе с учетом иерархии ролей
Args:
author_id: ID автора
permission: Разрешение для проверки
community_id: ID сообщества (по умолчанию 1)
Returns:
True если разрешение есть, False если нет
"""
return await user_has_permission(author_id, permission, community_id)
def assign_role_to_user(author_id: int, role: str, community_id: int = 1) -> bool:
"""
Назначает роль пользователю в сообществе
Args:
author_id: ID автора
role: Название роли
community_id: ID сообщества (по умолчанию 1)
Returns:
True если роль была добавлена, False если уже была
"""
with local_session() as session:
2025-07-31 18:55:59 +03:00
ca = CommunityAuthor.find_author_in_community(author_id, community_id, session)
2025-07-02 22:30:21 +03:00
if ca:
if ca.has_role(role):
return False # Роль уже есть
ca.add_role(role)
else:
# Создаем новую запись
ca = CommunityAuthor(community_id=community_id, author_id=author_id, roles=role)
session.add(ca)
session.commit()
return True
def remove_role_from_user(author_id: int, role: str, community_id: int = 1) -> bool:
"""
Удаляет роль у пользователя в сообществе
Args:
author_id: ID автора
role: Название роли
community_id: ID сообщества (по умолчанию 1)
Returns:
True если роль была удалена, False если её не было
"""
with local_session() as session:
2025-07-31 18:55:59 +03:00
ca = CommunityAuthor.find_author_in_community(author_id, community_id, session)
2025-07-02 22:30:21 +03:00
if ca and ca.has_role(role):
ca.remove_role(role)
# Если ролей не осталось, удаляем запись
if not ca.role_list:
session.delete(ca)
session.commit()
return True
return False
def migrate_old_roles_to_community_author():
"""
Функция миграции для переноса ролей из старой системы в CommunityAuthor
[непроверенное] Предполагает, что старые роли хранились в auth.orm.AuthorRole
"""
with local_session() as session:
# Получаем все старые роли
old_roles = session.query(AuthorRole).all()
print(f"[миграция] Найдено {len(old_roles)} старых записей ролей")
# Группируем по автору и сообществу
user_community_roles = {}
for role in old_roles:
key = (role.author, role.community)
if key not in user_community_roles:
user_community_roles[key] = []
# Извлекаем базовое имя роли (убираем суффикс сообщества если есть)
role_name = role.role
2025-07-31 18:55:59 +03:00
base_role = role_name.split("-")[0] if (isinstance(role_name, str) and "-" in role_name) else role_name
2025-07-02 22:30:21 +03:00
if base_role not in user_community_roles[key]:
user_community_roles[key].append(base_role)
# Создаем новые записи CommunityAuthor
migrated_count = 0
for (author_id, community_id), roles in user_community_roles.items():
# Проверяем, есть ли уже запись
2025-07-31 18:55:59 +03:00
existing = CommunityAuthor.find_author_in_community(author_id, community_id, session)
2025-07-02 22:30:21 +03:00
if not existing:
ca = CommunityAuthor(community_id=community_id, author_id=author_id)
ca.set_roles(roles)
session.add(ca)
migrated_count += 1
else:
print(f"[миграция] Запись для автора {author_id} в сообществе {community_id} уже существует")
session.commit()
print(f"[миграция] Создано {migrated_count} новых записей CommunityAuthor")
print("[миграция] Миграция завершена. Проверьте результаты перед удалением старых таблиц.")
# === CRUD ОПЕРАЦИИ ДЛЯ RBAC ===
def get_all_community_members_with_roles(community_id: int = 1) -> list[dict[str, Any]]:
"""
Получает всех участников сообщества с их ролями и разрешениями
Args:
community_id: ID сообщества
Returns:
Список участников с полной информацией
"""
with local_session() as session:
2025-07-31 18:55:59 +03:00
community = session.query(Community).where(Community.id == community_id).first()
2025-07-02 22:30:21 +03:00
if not community:
return []
return community.get_community_members(with_roles=True)
def bulk_assign_roles(user_role_pairs: list[tuple[int, str]], community_id: int = 1) -> dict[str, int]:
"""
Массовое назначение ролей пользователям
Args:
user_role_pairs: Список кортежей (author_id, role)
community_id: ID сообщества
Returns:
Статистика операции в формате {"success": int, "failed": int}
"""
success_count = 0
failed_count = 0
for author_id, role in user_role_pairs:
try:
if assign_role_to_user(author_id, role, community_id):
success_count += 1
else:
# Если роль уже была, считаем это успехом
success_count += 1
except Exception as e:
print(f"[ошибка] Не удалось назначить роль {role} пользователю {author_id}: {e}")
failed_count += 1
return {"success": success_count, "failed": failed_count}