diff --git a/README.md b/README.md index 22b4d1b..2c07ff3 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,14 @@ ### Как это работает -Сервис подписывается на Redus PubSub каналы - - `new_reaction`, - - `new_follower:`, +При каждом обращении к `/connect` создаётся отдельная на Redus PubSub каналы + - `new_reaction` - `new_shout` - - `chat:` - - Сервис пересылает из этих каналов те сообщения, которые предназначены пользователю, который подписался на SSE по адресу `/connect` токеном авторизации в заголовке `Authorization` \ No newline at end of file + - `followers:` + - `chat:` + +После подписки на эти каналы, сервис начинает пересылать сообщения из этих каналов. Он пересылает только те сообщения, которые предназначены пользователю, подписавшемуся на Server-Sent Events (SSE) по адресу `/connect`. Для авторизации подписки используется токен, который передается в заголовке `Authorization`. + +Таким образом, приложение обеспечивает реализацию механизма подписки и пересылки сообщений, позволяя пользователям получать только те уведомления, которые предназначены непосредственно для них. + +При завершении подключения, все подписки автоматически отменяются, так как они связаны с конкретным подключением. Если пользователь снова подключается, процесс подписки повторяется. \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index 3fdc392..35fdc07 100644 --- a/src/data.rs +++ b/src/data.rs @@ -109,11 +109,6 @@ pub async fn is_fitting( payload: HashMap, ) -> Result { match &kind[0..9] { - "new_follo" => { - // payload is Author, kind is new_follower: - let author_id = kind.split(":").last().unwrap(); - Ok(author_id.to_string() == listener_id.to_string()) - } "new_react" => { // payload is Reaction, kind is new_reaction let shout_id = payload.get("shout").unwrap(); diff --git a/src/main.rs b/src/main.rs index 3121810..170de43 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,26 @@ -use actix_web::{HttpRequest, web, App, HttpResponse, HttpServer, web::Bytes}; +use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized}; use actix_web::middleware::Logger; -use redis::{Client, AsyncCommands}; +use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer}; +use futures::StreamExt; +use redis::{AsyncCommands, Client}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; -use futures::StreamExt; -use tokio::sync::broadcast; -use actix_web::error::{ErrorUnauthorized, ErrorInternalServerError as ServerError}; use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; use tokio::task::JoinHandle; mod data; - #[derive(Clone)] struct AppState { tasks: Arc>>>, redis: Client, } - #[derive(Serialize, Deserialize)] struct RedisMessageData { payload: HashMap, - kind: String + kind: String, } async fn connect_handler( @@ -30,11 +28,7 @@ async fn connect_handler( state: web::Data, ) -> Result { let token = match req.headers().get("Authorization") { - Some(val) => val.to_str() - .unwrap_or("") - .split(" ") - .last() - .unwrap_or(""), + Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""), None => return Err(ErrorUnauthorized("Unauthorized")), }; let listener_id = data::get_auth_id(&token).await.map_err(|e| { @@ -47,22 +41,27 @@ async fn connect_handler( ServerError("Internal Server Error") })?; - con.sadd::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| { - eprintln!("Failed to add author to online list: {}", e); - ServerError("Internal Server Error") - })?; + con.sadd::<&str, &i32, usize>("authors-online", &listener_id) + .await + .map_err(|e| { + eprintln!("Failed to add author to online list: {}", e); + ServerError("Internal Server Error") + })?; - let chats: Vec = con.smembers::>(format!("chats_by_author/{}", listener_id)).await.map_err(|e| { - eprintln!("Failed to get chats by author: {}", e); - ServerError("Internal Server Error") - })?; + let chats: Vec = con + .smembers::>(format!("chats_by_author/{}", listener_id)) + .await + .map_err(|e| { + eprintln!("Failed to get chats by author: {}", e); + ServerError("Internal Server Error") + })?; let (tx, mut rx) = broadcast::channel(100); let state_clone = state.clone(); let handle = tokio::spawn(async move { let conn = state_clone.redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); - let followers_channel = format!("new_follower:{}", listener_id); + let followers_channel = format!("followers:{}", listener_id); pubsub.subscribe(followers_channel.clone()).await.unwrap(); println!("'{}' subscribed", followers_channel); pubsub.subscribe("new_shout").await.unwrap(); @@ -79,19 +78,33 @@ async fn connect_handler( while let Some(msg) = pubsub.on_message().next().await { let message_str: String = msg.get_payload().unwrap(); let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap(); - if msg.get_channel_name().starts_with("chat:") || data::is_fitting(listener_id, message_data.kind.to_string(), message_data.payload).await.is_ok() { + if msg.get_channel_name().starts_with("chat:") + || msg.get_channel_name().starts_with("followers:") + || data::is_fitting( + listener_id, + message_data.kind.to_string(), + message_data.payload, + ) + .await + .is_ok() + { let send_result = tx.send(message_str); if send_result.is_err() { - let _ = con.srem::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| { - eprintln!("Failed to remove author from online list: {}", e); - ServerError("Internal Server Error") - }); + // remove author from online list + let _ = con + .srem::<&str, &i32, usize>("authors-online", &listener_id) + .await + .map_err(|e| { + eprintln!("Failed to remove author from online list: {}", e); + ServerError("Internal Server Error") + }); break; } }; } }); - state.tasks + state + .tasks .lock() .unwrap() .insert(format!("{}", listener_id.clone()), handle); @@ -101,14 +114,14 @@ async fn connect_handler( ServerError("Internal Server Error") })?; - let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); + let server_event_stream = + futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream")) .streaming(server_event_stream)) } - #[actix_web::main] async fn main() -> std::io::Result<()> { let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/")); @@ -128,5 +141,4 @@ async fn main() -> std::io::Result<()> { .bind("0.0.0.0:8080")? .run() .await -} - +} \ No newline at end of file