diff --git a/Cargo.lock b/Cargo.lock index 28bac70..a14d045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,21 +232,6 @@ dependencies = [ "alloc-no-stdlib", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "async-trait" version = "0.1.73" @@ -364,20 +349,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" -dependencies = [ - "android-tzdata", - "iana-time-zone", - "js-sys", - "num-traits", - "wasm-bindgen", - "windows-targets", -] - [[package]] name = "combine" version = "4.6.6" @@ -483,19 +454,16 @@ dependencies = [ ] [[package]] -name = "discoursio-presense" +name = "discoursio-presence" version = "0.2.8" dependencies = [ "actix-web", - "bytes", - "chrono", "futures", "redis", "reqwest", "serde", "serde_json", "tokio", - "uuid", ] [[package]] @@ -792,29 +760,6 @@ dependencies = [ "tokio-native-tls", ] -[[package]] -name = "iana-time-zone" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - [[package]] name = "idna" version = "0.4.0" @@ -973,15 +918,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "num-traits" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" -dependencies = [ - "autocfg", -] - [[package]] name = "num_cpus" version = "1.16.0" @@ -1639,15 +1575,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "uuid" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" -dependencies = [ - "getrandom", -] - [[package]] name = "vcpkg" version = "0.2.15" @@ -1773,15 +1700,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 01a8162..bbe77aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "discoursio-presense" +name = "discoursio-presence" version = "0.2.8" edition = "2021" @@ -11,11 +11,8 @@ tokio = { version = "1", features = ["full"]} redis = { version = "0.23", features = ["tokio-comp"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -bytes = "1.0" reqwest = { version = "0.11", features = ["json"] } futures = "0.3.28" -uuid = { version = "1.4.1", features = ["v4"] } -chrono = "0.4" [[bin]] name = "presense" diff --git a/README.md b/README.md index 7ad4eeb..da4ff14 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,14 @@ +## Presence + +"Присутствие" - это сервер для пересылки сообщений в реальном времени. Текущая версия использует SSE-транспорт. + + ### ENV - API_BASE - REDIS_URL + + +### Как это работает + +Сервис подписывается на Redus PubSub каналы `new_reaction`, `new_follower`, `new_shout` и `chat:` и пересылает из них те сообщения, которые предназначены пользователю, который подписался на SSE по адресу `/presence/` \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index 7307cc7..5056e8d 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,35 +1,8 @@ - use reqwest::Client as HTTPClient; -use serde::{Serialize, Deserialize}; use serde_json::Value; +use std::collections::HashMap; use std::error::Error; use std::env; -use uuid::Uuid; -use chrono::Utc; - - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -enum PayloadKind { - NewMessage, - NewFollower, - NewShout, - NewApproval, - NewComment, - NewRate, -} - -#[derive(Debug, Serialize, Deserialize)] -struct Payload { - chat_id: Option, - shout_id: Option, - author_id: Option, - topic_id: Option, - reaction_id: Option, - community_id: Option, - kind: PayloadKind, - body: String, -} pub async fn get_auth_id(token: &str) -> Result> { let api_base = env::var("API_BASE")?; @@ -51,33 +24,69 @@ pub async fn get_auth_id(token: &str) -> Result> { Ok(id) } -pub async fn create_first_chat(author_id: i32, con: &mut redis::aio::Connection) -> Result, Box> { - let chat_id = Uuid::new_v4().to_string(); - let members = vec![author_id.to_string(), "1".to_string()]; - let timestamp = Utc::now().timestamp(); - let chat = serde_json::json!({ - "id": chat_id.clone(), - "admins": members.clone(), - "members": members.clone(), - "title": "", - "createdBy": author_id, - "createdAt": timestamp, - "updatedAt": timestamp, - }); - - let _: () = redis::pipe() - .atomic() - .cmd("SADD") - .arg(format!("chats_by_author/{}", author_id)) - .arg(&chat_id) - .ignore() - .set(format!("chats/{}", chat_id), chat.to_string()) - .ignore() - .set(format!("chats/{}/next_message_id", chat_id), "0") - .ignore() - .query_async(con) +async fn get_shout_followers(shout_id: &str) -> Result, Box> { + let api_base = env::var("API_BASE")?; + let gql = format!(r#" + query {{ + shoutFollowers(shout: "{}") {{ + follower {{ + id + }} + }} + }} + "#, shout_id); + let client = reqwest::Client::new(); + let response = client + .post(&api_base) + .body(gql) + .send() .await?; - Ok(vec![chat_id]) + let response_body: serde_json::Value = response.json().await?; + + let ids: Vec = response_body["data"]["shoutFollowers"] + .as_array() + .ok_or("Failed to parse follower array")? + .iter() + .filter_map(|f| f["follower"]["id"].as_i64().map(|id| id as i32)) + .collect(); + + Ok(ids) +} + + +pub async fn is_fitting(listener_id: i32, payload: HashMap) -> Result { + match payload.get("kind") { + Some(kind) => { + match kind.as_str() { + "new_follower" => { + // payload is AuthorFollower + Ok(payload.get("author").unwrap().to_string() == listener_id.to_string()) + }, + "new_reaction" => { + // payload is Reaction + let shout_id = payload.get("shout").unwrap(); + let recipients = get_shout_followers(shout_id).await.unwrap(); + + Ok(recipients.contains(&listener_id)) + }, + "new_shout" => { + // payload is Shout + // TODO: check all community subscribers if no then + // check all topics subscribers if no then + // check all authors subscribers + Ok(true) + }, + "new_message" => { + // payload is Chat + let members_str = payload.get("members").unwrap(); + let members = serde_json::from_str::>(members_str).unwrap(); + Ok(members.contains(&listener_id.to_string())) + }, + _ => Err("Invalid kind"), + } + }, + None => Err("No kind provided"), + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index eb1c588..a064f48 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,113 +1,81 @@ -use actix_web::{web, App, HttpResponse, HttpServer, Responder, web::Bytes}; +use actix_web::{web, App, HttpResponse, HttpServer, web::Bytes}; use redis::{Client, AsyncCommands}; use std::collections::HashMap; use std::env; use futures::StreamExt; use tokio::sync::broadcast; +use actix_web::error::{ErrorUnauthorized, ErrorInternalServerError as ServerError}; mod data; async fn sse_handler( token: web::Path, redis: web::Data, -) -> impl Responder { +) -> Result { + let listener_id = data::get_auth_id(&token).await.map_err(|e| { + eprintln!("TOKEN check failed: {}", e); + ErrorUnauthorized("Unauthorized") + })?; - let author_id = match data::get_auth_id(&token).await { - Ok(id) => id, - Err(e) => { - eprintln!("TOKEN check failed: {}", e); - return HttpResponse::Unauthorized().finish(); - } - }; + let mut con = redis.get_async_connection().await.map_err(|e| { + eprintln!("Failed to get async connection: {}", e); + ServerError("Internal Server Error") + })?; - let mut con = match redis.get_async_connection().await { - Ok(con) => con, - Err(e) => { - eprintln!("Failed to get async connection: {}", e); - return HttpResponse::InternalServerError().finish(); - } - }; + con.sadd::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| { + eprintln!("Failed to add author to online list: {}", e); + ServerError("Internal Server Error") + })?; - let _ = match con.sadd::<&str, &i32, usize>("authors-online", &author_id).await { - Ok(_) => (), - Err(e) => { - eprintln!("Failed to add author to online list: {}", e); - return HttpResponse::InternalServerError().finish(); - } - }; - - let chats: Vec = match con.smembers::>(format!("chats_by_author/{}", author_id)).await { - Ok(chats) => { - if chats.is_empty() { - match data::create_first_chat(author_id, &mut con).await { - Ok(chat) => chat, - Err(e) => { - eprintln!("Failed to create first chat: {}", e); - return HttpResponse::InternalServerError().finish(); - } - } - } else { - chats - } - }, - Err(e) => { - eprintln!("Failed to get chats by author: {}", e); - match data::create_first_chat(author_id, &mut con).await { - Ok(chat) => chat, - Err(e) => { - eprintln!("Failed to create first chat: {}", e); - return HttpResponse::InternalServerError().finish(); - } - } - } - }; + let chats: Vec = con.smembers::>(format!("chats_by_author/{}", listener_id)).await.map_err(|e| { + eprintln!("Failed to get chats by author: {}", e); + ServerError("Internal Server Error") + })?; let (tx, mut rx) = broadcast::channel(100); let _handle = tokio::spawn(async move { - let conn = redis.get_async_connection().await.expect("Failed to get async connection"); + let conn = redis.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); - pubsub.subscribe("new_follower").await.expect("Failed to subscribe to new_follower"); - pubsub.subscribe("new_shout").await.expect("Failed to subscribe to new_shout"); - pubsub.subscribe("new_reaction").await.expect("Failed to subscribe to new_reaction"); + pubsub.subscribe("new_follower").await.unwrap(); + pubsub.subscribe("new_shout").await.unwrap(); + pubsub.subscribe("new_reaction").await.unwrap(); + for chat_id in &chats { let channel_name = format!("chat:{}", chat_id); - pubsub - .subscribe(channel_name.clone()) - .await - .expect(&format!("Failed to subscribe to {}", channel_name)); + pubsub.subscribe(&channel_name).await.unwrap(); } while let Some(msg) = pubsub.on_message().next().await { - let payload: HashMap = msg.get_payload().expect("Failed to get payload"); - tx.clone().send(serde_json::to_string(&payload).expect("Failed to serialize payload")).expect("Failed to send payload"); + let payload: HashMap = msg.get_payload().unwrap(); + if data::is_fitting(listener_id, payload.clone()).await.is_ok() { + let _ = tx.send(serde_json::to_string(&payload).unwrap()); + }; } }); - - let server_event = match rx.recv().await { - Ok(event) => event, - Err(e) => { - eprintln!("Failed to receive server event: {}", e); - return HttpResponse::InternalServerError().finish(); - } - }; + let server_event = rx.recv().await.map_err(|e| { + eprintln!("Failed to receive server event: {}", e); + ServerError("Internal Server Error") + })?; + let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) }); - HttpResponse::Ok() + Ok(HttpResponse::Ok() .append_header(("content-type", "text/event-stream")) - .streaming(server_event_stream) + .streaming(server_event_stream)) } #[actix_web::main] async fn main() -> std::io::Result<()> { - let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); - let client = redis::Client::open(redis_url).expect("Failed to open Redis client"); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/")); + let client = redis::Client::open(redis_url).unwrap(); HttpServer::new(move || { App::new() .app_data(web::Data::new(client.clone())) - .route("/presence/{token}", web::get().to(sse_handler)) + .route("/connect", web::get().to(sse_handler)) + .route("/disconnect", web::get().to(sse_handler)) }) .bind("127.0.0.1:8080")? .run()