diff --git a/Cargo.lock b/Cargo.lock index 9b6cba1..28bac70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,6 +487,7 @@ name = "discoursio-presense" version = "0.2.8" dependencies = [ "actix-web", + "bytes", "chrono", "futures", "redis", diff --git a/Cargo.toml b/Cargo.toml index 0331c58..01a8162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,10 +7,11 @@ edition = "2021" [dependencies] actix-web = "4.4.0" +tokio = { version = "1", features = ["full"]} redis = { version = "0.23", features = ["tokio-comp"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1", features = ["full"] } +bytes = "1.0" reqwest = { version = "0.11", features = ["json"] } futures = "0.3.28" uuid = { version = "1.4.1", features = ["v4"] } diff --git a/src/main.rs b/src/main.rs index 1886e23..eb1c588 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,15 +3,15 @@ use redis::{Client, AsyncCommands}; use std::collections::HashMap; use std::env; use futures::StreamExt; -use tokio::sync::broadcast::{self, Receiver}; +use tokio::sync::broadcast; -pub mod data; +mod data; async fn sse_handler( token: web::Path, - rx: web::Data>, redis: web::Data, ) -> impl Responder { + let author_id = match data::get_auth_id(&token).await { Ok(id) => id, Err(e) => { @@ -62,34 +62,30 @@ async fn sse_handler( } }; - let mut pubsub = con.into_pubsub(); - for chat_id in &chats { - if let Err(e) = pubsub.subscribe(format!("chat:{}", chat_id)).await { - eprintln!("Failed to subscribe to chat: {}", e); - return HttpResponse::InternalServerError().finish(); - } - } + let (tx, mut rx) = broadcast::channel(100); + let _handle = tokio::spawn(async move { + let conn = redis.get_async_connection().await.expect("Failed to get async connection"); + let mut pubsub = conn.into_pubsub(); - let mut con = match redis.get_async_connection().await { - Ok(con) => con, - Err(e) => { - eprintln!("Failed to get async connection: {}", e); - return HttpResponse::InternalServerError().finish(); + pubsub.subscribe("new_follower").await.expect("Failed to subscribe to new_follower"); + pubsub.subscribe("new_shout").await.expect("Failed to subscribe to new_shout"); + pubsub.subscribe("new_reaction").await.expect("Failed to subscribe to new_reaction"); + for chat_id in &chats { + let channel_name = format!("chat:{}", chat_id); + pubsub + .subscribe(channel_name.clone()) + .await + .expect(&format!("Failed to subscribe to {}", channel_name)); } - }; - let _ = match con.srem::<&str, &i32, usize>("authors-online", &author_id).await { - Ok(_) => (), - Err(e) => { - eprintln!("Failed to remove author from online list: {}", e); - return HttpResponse::InternalServerError().finish(); + while let Some(msg) = pubsub.on_message().next().await { + let payload: HashMap = msg.get_payload().expect("Failed to get payload"); + tx.clone().send(serde_json::to_string(&payload).expect("Failed to serialize payload")).expect("Failed to send payload"); } - }; + }); + - // FIXME: cannot borrow data in an `Arc` as mutable - // trait `DerefMut` is required to modify through a dereference, - // but it is not implemented for `Arc>` - let server_event = match rx.recv().await { + let server_event = match rx.recv().await { Ok(event) => event, Err(e) => { eprintln!("Failed to receive server event: {}", e); @@ -105,28 +101,11 @@ async fn sse_handler( #[actix_web::main] async fn main() -> std::io::Result<()> { - let (tx, _rx) = broadcast::channel(100); let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let client = redis::Client::open(redis_url).expect("Failed to open Redis client"); - let _handle = tokio::spawn(async move { - let mut conn = client.get_async_connection().await.expect("Failed to get async connection"); - let mut pubsub = conn.into_pubsub(); - - pubsub.subscribe("new_follower").await.expect("Failed to subscribe to new_follower"); - pubsub.subscribe("new_shout").await.expect("Failed to subscribe to new_shout"); - pubsub.subscribe("new_reaction").await.expect("Failed to subscribe to new_reaction"); - - while let Some(msg) = pubsub.on_message().next().await { - let payload: HashMap = msg.get_payload().expect("Failed to get payload"); - tx.send(serde_json::to_string(&payload).expect("Failed to serialize payload")).expect("Failed to send payload"); - } - }); - HttpServer::new(move || { - let rx = tx.subscribe(); App::new() - .app_data(web::Data::new(rx)) .app_data(web::Data::new(client.clone())) .route("/presence/{token}", web::get().to(sse_handler)) })