use serde_json::Value; use sse_actix_web::{broadcast, Broadcaster}; use std::collections::HashMap; use std::env; use tokio::sync::broadcast::Receiver; #[derive(Debug, Deserialize)] struct Payload { reader_id: i32, } async fn get_author_id(token: &str) -> Result> { let client = Client::new(); let gql = r#"mutation { getSession { user { id } } }"#; let api_base = env::var("API_BASE")?; let response = client .post(api_base) .bearer_auth(token) .body(gql) .send() .await?; let response_body: Value = response .json() .await .map_err(|e| format!("Failed to parse response body: {}", e))?; let id = response_body["data"]["getSession"]["user"]["id"] .as_i64() .ok_or("Failed to get user id by token")? as i32; Ok(id) } async fn sse_handler( broadcaster: web::Data, token: web::Path, rx: web::Data>, ) -> impl Responder { let author_id = match get_author_id(&token).await { Ok(id) => id, Err(e) => { eprintln!("Failed to validate token: {}", e); return actix_web::HttpResponse::Unauthorized().finish(); } }; let mut stream = rx.into_inner().into_stream(); while let Some(Ok(payload)) = stream.next().await { let payload: Payload = serde_json::from_str(&payload).unwrap(); if payload.reader_id == author_id { broadcast(&broadcaster, &payload); } } "Subscribed to SSE" } #[actix_web::main] async fn main() -> std::io::Result<()> { let broadcaster = Broadcaster::new(); // Create a single Redis Pub/Sub connection and broadcast channel let (tx, rx) = broadcast::channel(100); let redis_url = env::var("REDIS_URL").unwrap(); let _handle = tokio::spawn(async move { let client = redis::Client::open(redis_url).unwrap(); let mut conn = client.get_async_connection().await.unwrap(); let mut pubsub = conn.into_pubsub(); pubsub.subscribe("new_follower").await.unwrap(); pubsub.subscribe("new_reaction").await.unwrap(); pubsub.subscribe("new_shout").await.unwrap(); pubsub.subscribe("new_approval").await.unwrap(); while let Some(msg) = pubsub.on_message().next().await { let payload: HashMap = msg.get_payload().unwrap(); tx.send(serde_json::to_string(&payload).unwrap()).unwrap(); } }); HttpServer::new(move || { App::new() .data(broadcaster.clone()) .data(rx.clone()) .route("/sse/{token}", web::get().to(sse_handler)) }) .bind("127.0.0.1:8080")? .run() .await }