From 7ba39c67f9ca052bf385803992494ae9bbaeca78 Mon Sep 17 00:00:00 2001 From: Tony Rewin Date: Thu, 28 Sep 2023 13:12:07 +0300 Subject: [PATCH] improved-code --- README.md | 4 ++ src/main.rs | 153 ++++++++++++++++++++-------------------------------- 2 files changed, 62 insertions(+), 95 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..39309e4 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +### ENV + + - API_BASE + - REDIS_URL \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index f864fd4..3a9559d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,123 +1,86 @@ -use actix_sse::Sse; -use actix_web::{web, App, HttpServer, Responder}; -use tokio::sync::broadcast::Receiver; +use serde_json::Value; +use sse_actix_web::{broadcast, Broadcaster}; use std::collections::HashMap; -use std::sync::Mutex; use std::env; -use reqwest::Client; -use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast::Receiver; -#[derive(Serialize)] -struct ValidateJWTTokenInput { - token_type: String, - token: String, - roles: Option>, +#[derive(Debug, Deserialize)] +struct Payload { + reader_id: i32, } -#[derive(Deserialize)] -struct ValidateJWTTokenResponse { - is_valid: bool, - claims: HashMap, -} - -#[derive(Deserialize)] -struct ProfileResponse { - data: ProfileData, -} - -#[derive(Deserialize)] -struct ProfileData { - profile: Profile, -} - -#[derive(Deserialize)] -struct Profile { - id: i32, -} -#[derive(Deserialize)] -struct AuthorResponse { - data: AuthorData, -} - -#[derive(Deserialize)] -struct AuthorData { - author: Author, -} - -#[derive(Deserialize)] -struct Author { - id: i32, -} - -async fn get_author_id(user_id: i32) -> Result> { +async fn get_author_id(token: &str) -> Result> { let client = Client::new(); - let query = format!(r#"{{ - author(user: {}) {{ - id - }} - }}"#, user_id); - let api_base = env::var("API_BASE").unwrap(); - let response = client.post(api_base) - .body(query) - .send() - .await?; - let response_body: AuthorResponse = response.json().await?; - Ok(response_body.data.author.id) -} - -async fn get_user_id(token: &str) -> Result> { - let client = Client::new(); - let query = r#"{ profile { id } }"#; - let authorizer_url = env::var("AUTHORIZER_URL").unwrap(); - let response = client.post(authorizer_url) + let gql = r#"mutation { getSession { user { id } } }"#; + let api_base = env::var("API_BASE")?; + let response = client + .post(api_base) .bearer_auth(token) - .body(query) + .body(gql) .send() .await?; - let response_body: ProfileResponse = response.json().await?; - Ok(response_body.data.profile.id) + let response_body: Value = response + .json() + .await + .map_err(|e| format!("Failed to parse response body: {}", e))?; + let id = response_body["data"]["getSession"]["user"]["id"] + .as_i64() + .ok_or("Failed to get user id by token")? as i32; + Ok(id) } -async fn sse_handler(receiver: web::Data>, token: web::Path) -> impl Responder { - let user_id = match get_user_id(&token).await { +async fn sse_handler( + broadcaster: web::Data, + token: web::Path, + rx: web::Data>, +) -> impl Responder { + let author_id = match get_author_id(&token).await { Ok(id) => id, Err(e) => { eprintln!("Failed to validate token: {}", e); return actix_web::HttpResponse::Unauthorized().finish(); } }; - let author_id = match get_author_id(user_id).await.unwrap(); - let mut receivers = receiver.lock().unwrap(); - let redis_url = env::var("REDIS_URL").unwrap(); - let rx = receivers.entry(session_id.into_inner()).or_insert_with(|| { - let (tx, rx) = broadcast::channel(100); - let _handle = tokio::spawn(async move { - let client = redis::Client::open(redis_url).unwrap(); - let mut conn = client.get_async_connection().await.unwrap(); - let mut pubsub = conn.into_pubsub(); - // Subscribe to multiple channels - pubsub.subscribe("new_follower").await.unwrap(); // follow to author - pubsub.subscribe("new_reaction").await.unwrap(); // react on post - pubsub.subscribe("new_shout").await.unwrap(); // post in subscribed topic, author or community - pubsub.subscribe("new_approval").await.unwrap(); // post approved by community - while let Some(msg) = pubsub.on_message().next().await { - let payload: String = msg.get_payload().unwrap(); - tx.send(payload).unwrap(); - } - }); - rx - }); - Sse::new(rx.subscribe()) + let mut stream = rx.into_inner().into_stream(); + + while let Some(Ok(payload)) = stream.next().await { + let payload: Payload = serde_json::from_str(&payload).unwrap(); + if payload.reader_id == author_id { + broadcast(&broadcaster, &payload); + } + } + + "Subscribed to SSE" } #[actix_web::main] async fn main() -> std::io::Result<()> { - let receivers: web::Data>>> = web::Data::new(Mutex::new(HashMap::new())); + let broadcaster = Broadcaster::new(); + + // Create a single Redis Pub/Sub connection and broadcast channel + let (tx, rx) = broadcast::channel(100); + let redis_url = env::var("REDIS_URL").unwrap(); + let _handle = tokio::spawn(async move { + let client = redis::Client::open(redis_url).unwrap(); + let mut conn = client.get_async_connection().await.unwrap(); + let mut pubsub = conn.into_pubsub(); + + pubsub.subscribe("new_follower").await.unwrap(); + pubsub.subscribe("new_reaction").await.unwrap(); + pubsub.subscribe("new_shout").await.unwrap(); + pubsub.subscribe("new_approval").await.unwrap(); + + while let Some(msg) = pubsub.on_message().next().await { + let payload: HashMap = msg.get_payload().unwrap(); + tx.send(serde_json::to_string(&payload).unwrap()).unwrap(); + } + }); HttpServer::new(move || { App::new() - .app_data(receivers.clone()) + .data(broadcaster.clone()) + .data(rx.clone()) .route("/sse/{token}", web::get().to(sse_handler)) }) .bind("127.0.0.1:8080")?