core/services/auth.py

204 lines
9.0 KiB
Python
Raw Normal View History

2024-01-25 19:41:27 +00:00
from functools import wraps
2025-05-16 06:23:48 +00:00
from typing import Tuple
2024-04-08 07:38:58 +00:00
2025-05-21 15:29:46 +00:00
from starlette.requests import Request
2025-05-22 01:34:30 +00:00
from cache.cache import get_cached_author_by_id
2024-08-09 06:37:06 +00:00
from resolvers.stat import get_with_stat
from utils.logger import root_logger as logger
2025-05-16 06:23:48 +00:00
from auth.internal import verify_internal_auth
from sqlalchemy import exc
from services.db import local_session
from auth.orm import Author, Role
2025-05-21 15:29:46 +00:00
from settings import SESSION_TOKEN_HEADER
2024-01-13 08:49:12 +00:00
2024-12-24 11:04:52 +00:00
# Список разрешенных заголовков
2025-01-21 07:09:28 +00:00
ALLOWED_HEADERS = ["Authorization", "Content-Type"]
2024-02-21 07:27:16 +00:00
2025-05-21 15:29:46 +00:00
async def check_auth(req: Request) -> Tuple[str, list[str], bool]:
2024-12-11 20:49:58 +00:00
"""
Проверка авторизации пользователя.
2025-05-16 06:23:48 +00:00
Проверяет токен и получает данные из локальной БД.
2024-12-11 20:49:58 +00:00
Параметры:
- req: Входящий GraphQL запрос, содержащий заголовок авторизации.
Возвращает:
2025-05-16 06:23:48 +00:00
- user_id: str - Идентификатор пользователя
- user_roles: list[str] - Список ролей пользователя
2025-05-20 22:34:02 +00:00
- is_admin: bool - Флаг наличия у пользователя административных прав
2024-12-11 20:49:58 +00:00
"""
2025-05-21 15:29:46 +00:00
logger.debug(f"[check_auth] Проверка авторизации...")
# Получаем заголовок авторизации
token = None
# Проверяем заголовок с учетом регистра
headers_dict = dict(req.headers.items())
logger.debug(f"[check_auth] Все заголовки: {headers_dict}")
# Ищем заголовок Authorization независимо от регистра
for header_name, header_value in headers_dict.items():
if header_name.lower() == SESSION_TOKEN_HEADER.lower():
token = header_value
logger.debug(f"[check_auth] Найден заголовок {header_name}: {token[:10]}...")
break
2025-05-16 06:23:48 +00:00
if not token:
2025-05-21 15:29:46 +00:00
logger.debug(f"[check_auth] Токен не найден в заголовках")
2025-05-20 22:34:02 +00:00
return "", [], False
2025-01-21 07:09:28 +00:00
2025-05-16 06:23:48 +00:00
# Очищаем токен от префикса Bearer если он есть
if token.startswith("Bearer "):
token = token.split("Bearer ")[-1].strip()
2024-12-11 20:49:58 +00:00
2025-05-16 06:23:48 +00:00
# Проверяем авторизацию внутренним механизмом
2025-05-21 15:29:46 +00:00
logger.debug("[check_auth] Вызов verify_internal_auth...")
user_id, user_roles, is_admin = await verify_internal_auth(token)
logger.debug(f"[check_auth] Результат verify_internal_auth: user_id={user_id}, roles={user_roles}, is_admin={is_admin}")
2025-05-20 22:34:02 +00:00
2025-05-21 15:29:46 +00:00
# Если в ролях нет админа, но есть ID - проверяем в БД
if user_id and not is_admin:
try:
with local_session() as session:
# Преобразуем user_id в число
try:
user_id_int = int(user_id.strip())
except (ValueError, TypeError):
logger.error(f"Невозможно преобразовать user_id {user_id} в число")
else:
# Проверяем наличие админских прав через БД
from auth.orm import AuthorRole
admin_role = session.query(AuthorRole).filter(
AuthorRole.author == user_id_int,
AuthorRole.role.in_(["admin", "super"])
).first()
is_admin = admin_role is not None
except Exception as e:
logger.error(f"Ошибка при проверке прав администратора: {e}")
2025-05-20 22:34:02 +00:00
return user_id, user_roles, is_admin
2023-10-23 14:47:11 +00:00
2025-05-16 06:23:48 +00:00
async def add_user_role(user_id: str, roles: list[str] = None):
"""
Добавление ролей пользователю в локальной БД.
2024-01-25 19:41:27 +00:00
2025-05-16 06:23:48 +00:00
Args:
user_id: ID пользователя
roles: Список ролей для добавления. По умолчанию ["author", "reader"]
2024-12-11 20:49:58 +00:00
"""
2025-05-16 06:23:48 +00:00
if not roles:
roles = ["author", "reader"]
2024-12-11 20:49:58 +00:00
2025-05-16 06:23:48 +00:00
logger.info(f"Adding roles {roles} to user {user_id}")
2024-12-11 20:49:58 +00:00
2025-05-16 06:23:48 +00:00
logger.debug("Using local authentication")
with local_session() as session:
try:
author = session.query(Author).filter(Author.id == user_id).one()
2024-12-11 20:49:58 +00:00
2025-05-16 06:23:48 +00:00
# Получаем существующие роли
existing_roles = set(role.name for role in author.roles)
# Добавляем новые роли
for role_name in roles:
if role_name not in existing_roles:
# Получаем или создаем роль
role = session.query(Role).filter(Role.name == role_name).first()
if not role:
role = Role(id=role_name, name=role_name)
session.add(role)
# Добавляем роль автору
author.roles.append(role)
session.commit()
return user_id
except exc.NoResultFound:
logger.error(f"Author {user_id} not found")
return None
def login_required(f):
2025-05-20 22:34:02 +00:00
"""Декоратор для проверки авторизации пользователя. Требуется наличие роли 'reader'."""
2024-12-11 22:04:11 +00:00
2024-01-23 18:59:46 +00:00
@wraps(f)
2023-10-23 14:47:11 +00:00
async def decorated_function(*args, **kwargs):
2025-05-20 22:34:02 +00:00
from graphql.error import GraphQLError
2023-10-23 14:47:11 +00:00
info = args[1]
2024-04-17 15:32:23 +00:00
req = info.context.get("request")
2025-05-21 15:29:46 +00:00
logger.debug(f"[login_required] Проверка авторизации для запроса: {req.method} {req.url.path}")
logger.debug(f"[login_required] Заголовки: {req.headers}")
2025-05-20 22:34:02 +00:00
user_id, user_roles, is_admin = await check_auth(req)
if not user_id:
2025-05-21 15:29:46 +00:00
logger.debug(f"[login_required] Пользователь не авторизован, {dict(req)}, {info}")
2025-05-20 22:34:02 +00:00
raise GraphQLError("Требуется авторизация")
# Проверяем наличие роли reader
2025-05-21 15:29:46 +00:00
if 'reader' not in user_roles:
2025-05-20 22:34:02 +00:00
logger.error(f"Пользователь {user_id} не имеет роли 'reader'")
raise GraphQLError("У вас нет необходимых прав для доступа")
logger.info(f"Авторизован пользователь {user_id} с ролями: {user_roles}")
info.context["roles"] = user_roles
# Проверяем права администратора
info.context["is_admin"] = is_admin
2025-05-22 01:34:30 +00:00
author = await get_cached_author_by_id(user_id, get_with_stat)
2025-05-20 22:34:02 +00:00
if not author:
logger.error(f"Профиль автора не найден для пользователя {user_id}")
info.context["author"] = author
2024-02-27 11:16:54 +00:00
return await f(*args, **kwargs)
2023-10-23 14:47:11 +00:00
return decorated_function
2024-11-12 14:56:20 +00:00
def login_accepted(f):
2025-05-16 06:23:48 +00:00
"""Декоратор для добавления данных авторизации в контекст."""
2024-12-11 22:04:11 +00:00
2024-11-12 14:56:20 +00:00
@wraps(f)
async def decorated_function(*args, **kwargs):
info = args[1]
req = info.context.get("request")
2024-11-18 08:31:19 +00:00
2024-11-14 11:00:33 +00:00
logger.debug("login_accepted: Проверка авторизации пользователя.")
2025-05-20 22:34:02 +00:00
user_id, user_roles, is_admin = await check_auth(req)
2024-11-14 11:00:33 +00:00
logger.debug(f"login_accepted: user_id={user_id}, user_roles={user_roles}")
2024-11-18 08:31:19 +00:00
2024-11-12 14:56:20 +00:00
if user_id and user_roles:
2024-11-14 11:00:33 +00:00
logger.info(f"login_accepted: Пользователь авторизован: {user_id} с ролями {user_roles}")
2024-11-12 14:56:20 +00:00
info.context["roles"] = user_roles
2025-05-20 22:34:02 +00:00
# Проверяем права администратора
info.context["is_admin"] = is_admin
2024-11-12 14:56:20 +00:00
# Пробуем получить профиль автора
2025-05-22 01:34:30 +00:00
author = await get_cached_author_by_id(user_id, get_with_stat)
2024-11-14 11:00:33 +00:00
if author:
logger.debug(f"login_accepted: Найден профиль автора: {author}")
2025-05-20 22:34:02 +00:00
# Используем флаг is_admin из контекста или передаем права владельца для собственных данных
is_owner = True # Пользователь всегда является владельцем собственного профиля
info.context["author"] = author.dict(access=is_owner or is_admin)
2024-11-14 11:00:33 +00:00
else:
2024-11-18 08:31:19 +00:00
logger.error(
2024-12-16 16:39:31 +00:00
f"login_accepted: Профиль автора не найден для пользователя {user_id}. Используем базовые данные."
2025-05-16 06:23:48 +00:00
)
2024-11-12 14:56:20 +00:00
else:
2024-11-14 11:00:33 +00:00
logger.debug("login_accepted: Пользователь не авторизован. Очищаем контекст.")
2024-11-12 14:56:20 +00:00
info.context["roles"] = None
info.context["author"] = None
2025-05-20 22:34:02 +00:00
info.context["is_admin"] = False
return await f(*args, **kwargs)
2024-11-12 14:56:20 +00:00
2025-05-20 22:34:02 +00:00
return decorated_function