diff --git a/Cargo.lock b/Cargo.lock index d314a85..ba2b83a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,12 +459,12 @@ version = "0.2.14" dependencies = [ "actix-web", "futures", - "rand", "redis", "reqwest", "serde", "serde_json", "tokio", + "uuid", ] [[package]] @@ -1576,6 +1576,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 4a28883..b3a3e33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" reqwest = { version = "0.11", features = ["json"] } futures = "0.3.28" -rand = "0.8.5" +uuid = { version = "1.5.0", features = ["v4"] } [[bin]] name = "presence" diff --git a/README.md b/README.md index 1857bb2..2569a14 100644 --- a/README.md +++ b/README.md @@ -12,14 +12,23 @@ ### Как это работает -При каждом обращении к `/connect` создаётся отдельная асинхронная задача с подписками на Redus PubSub каналы - - `new_reaction` - - `new_shout` - - `followers:` +При каждом обращении к `/connect` создаётся отдельная асинхронная задача с подписками на Redus PubSub каналы, позволяя пользователям получать только те уведомления, которые предназначены непосредственно для них. + +Каналы Redis: + + - `reaction` + - `shout` + - `follower:` - `chat:` -После подписки на эти каналы, сервис начинает пересылать сообщения из этих каналов. Он пересылает только те сообщения, которые предназначены пользователю, подписавшемуся на Server-Sent Events (SSE) по адресу `/connect`. Для авторизации подписки используется токен, который передается в заголовке `Authorization`. +Сервис пересылает сообщения из этих каналов, которые предназначены пользователю, подписавшемуся на Server-Sent Events (SSE) по адресу `/connect`. Для авторизации подписки используется токен, который передается клиентом в заголовке `Authorization`, или в пути `/connect/{token}`, или в переменной запроса `/connect/?token={token}`. -Таким образом, приложение обеспечивает реализацию механизма подписки и пересылки сообщений, позволяя пользователям получать только те уведомления, которые предназначены непосредственно для них. +При завершении подключения, все подписки автоматически отменяются, так как они связаны с конкретным подключением. Если пользователь снова подключается, процесс подписки повторяется. -При завершении подключения, все подписки автоматически отменяются, так как они связаны с конкретным подключением. Если пользователь снова подключается, процесс подписки повторяется. \ No newline at end of file + +### Формат сообщений межсервисной коммуникации + +Между сервисами пересылаются целые сущности и типизация действий с ними, поля стандартного redis-сообщения: + +- `action` наименование операции, примеры: "create" | "delete" | "update" | "join" | "left" +- `payload` json одной из сущностей: Reaction | Shout | Author | Chat | Message \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index e69834f..4ef9d44 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,10 +1,12 @@ use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}; use reqwest::Client as HTTPClient; -use serde_json::{json, Value}; +use serde_json::json; use std::collections::HashMap; use std::env; use std::error::Error; +use crate::SSEMessageData; + pub async fn get_auth_id(token: &str) -> Result> { let auth_api_base = env::var("AUTH_URL")?; let (query_name, query_type) = match auth_api_base.contains("auth.discours.io") { @@ -105,32 +107,29 @@ async fn get_shout_followers(shout_id: &str) -> Result, Box> pub async fn is_fitting( listener_id: i32, - kind: String, - payload: HashMap, + message_data: SSEMessageData, ) -> Result { - match &kind[0..9] { - "new_react" => { - // payload is Reaction, kind is new_reaction - let shout_id = payload.get("shout").unwrap().as_str().unwrap(); - let recipients = get_shout_followers(shout_id).await.unwrap(); + if message_data.entity == "reaction" { + // payload is Reaction + let shout_id = message_data.payload.get("shout").unwrap().as_str().unwrap(); + let recipients = get_shout_followers(shout_id).await.unwrap(); - Ok(recipients.contains(&listener_id)) - } - "new_shout" => { - // payload is Shout, kind is "new_shout" - // TODO: check all community subscribers if no then - // check all topics subscribers if no then - // check all authors subscribers - Ok(true) - } - "new_messa" => { - println!("own message passed"); - Ok(false) - }, - _ => { - eprintln!("unknown payload kind"); - eprintln!("{:?}", payload); - Ok(false) - } + Ok(recipients.contains(&listener_id)) + } else if message_data.entity == "shout" { + // payload is Shout + // TODO: check all community subscribers if no then + // TODO: check all topics subscribers if no then + // TODO: check all authors subscribers + Ok(true) + } else if message_data.entity == "chat" { + // payload is Message or Chat + Ok(true) + } else if message_data.entity == "follower" { + // payload is Author + Ok(true) + }else { + eprintln!("[data] unknown entity"); + eprintln!("{:?}", message_data); + Ok(false) } } diff --git a/src/main.rs b/src/main.rs index 3073263..5695811 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use futures::StreamExt; use redis::{AsyncCommands, Client}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use uuid::Uuid; use std::collections::HashMap; use std::env; use std::sync::{Arc, Mutex}; @@ -21,14 +22,20 @@ struct AppState { #[derive(Serialize, Deserialize, Clone, Debug)] struct RedisMessageData { payload: HashMap, - kind: String, + action: String +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct SSEMessageData { + payload: HashMap, + action: String, + entity: String } async fn connect_handler( req: HttpRequest, state: web::Data, ) -> Result { - let handler_id: u64 = rand::random(); let token = match req.headers().get("Authorization") { Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""), @@ -71,13 +78,13 @@ async fn connect_handler( let handle = tokio::spawn(async move { let conn = state_clone.redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); - let followers_channel = format!("followers:{}", listener_id); + let followers_channel = format!("follower:{}", listener_id); pubsub.subscribe(followers_channel.clone()).await.unwrap(); - println!("'{}' subscribed", followers_channel); - pubsub.subscribe("new_shout").await.unwrap(); - println!("'new_shout' subscribed"); - pubsub.subscribe("new_reaction").await.unwrap(); - println!("'new_reaction' subscribed"); + println!("'{}' pubsub subscribed", followers_channel); + pubsub.subscribe("shout").await.unwrap(); + println!("'shout' pubsub subscribed"); + pubsub.subscribe("reaction").await.unwrap(); + println!("'reaction' pubsub subscribed"); for chat_id in &chats { let channel_name = format!("chat:{}", chat_id); @@ -86,23 +93,27 @@ async fn connect_handler( } while let Some(msg) = pubsub.on_message().next().await { - let message_str: String = msg.get_payload().unwrap(); - let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap(); - let message_author = message_data.payload.get("author") - .and_then(Value::as_i64) // Convert Value to i64 - .unwrap_or(-1) as i32; // Convert i64 to i32 - - if (msg.get_channel_name().starts_with("chat:") && message_author != listener_id) - || msg.get_channel_name().starts_with("followers:") - || data::is_fitting( + let redis_message_str: String = msg.get_payload().unwrap(); + let redis_message_data: RedisMessageData = serde_json::from_str(&redis_message_str).unwrap(); + let prepared_message_data = SSEMessageData { + payload: redis_message_data.payload, + action: redis_message_data.action, + entity: msg.get_channel_name() + .to_owned() + .split(":") + .next() + .unwrap_or("") + .to_string() + }; + if data::is_fitting( listener_id, - message_data.kind.to_string(), - message_data.payload, + prepared_message_data.clone(), ) .await .is_ok() { - let send_result = tx.send(message_str.clone()); + let prepared_message_str = serde_json::to_string(&prepared_message_data).unwrap(); + let send_result = tx.send(prepared_message_str.clone()); if send_result.is_err() { // remove author from online list let _ = con @@ -114,7 +125,7 @@ async fn connect_handler( }); break; } else { - println!("[handler {}] message handled {}", handler_id, message_str); + println!("[handler] message handled {}", prepared_message_str); } }; } @@ -125,16 +136,28 @@ async fn connect_handler( .unwrap() .insert(format!("{}", listener_id.clone()), handle); - let server_event_stream = futures::stream::unfold(rx, |mut rx| async { - let result = rx.recv().await; - match result { - Ok(server_event) => { - let formatted_server_event = format!("data: {}\n\n", server_event); - Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx)) - }, - Err(_) => None, - } - }); + let server_event_stream = futures::stream::unfold(rx, |mut rx| async { + let result = rx.recv().await; + match result { + Ok(server_event) => { + let message_data: SSEMessageData = serde_json::from_str(&server_event).unwrap(); + let event_entity = message_data.entity; // Assuming 'entity' field represents the entity type + let event_action = message_data.action; // 'action' field represents the operation type + let event_id = format!("{}", Uuid::new_v4()); // Generate a random UUID as the event ID + + let formatted_server_event = format!( + "id: {}\nevent: {}\ndata: {{\"action\": \"{}\", \"payload\": {}}}\n\n", + event_id, + event_entity, + event_action, + server_event + ); + + Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx)) + }, + Err(_) => None, + } + }); Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream"))