quoter/src/main.rs
2023-10-16 17:03:57 +03:00

133 lines
4.7 KiB
Rust

use actix_web::{HttpRequest, web, App, HttpResponse, HttpServer, web::Bytes};
use actix_web::middleware::Logger;
use redis::{Client, AsyncCommands};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use futures::StreamExt;
use tokio::sync::broadcast;
use actix_web::error::{ErrorUnauthorized, ErrorInternalServerError as ServerError};
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;
mod data;
#[derive(Clone)]
struct AppState {
tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
redis: Client,
}
#[derive(Serialize, Deserialize)]
struct RedisMessageData {
payload: HashMap<String, String>,
kind: String
}
async fn connect_handler(
req: HttpRequest,
state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let token = match req.headers().get("Authorization") {
Some(val) => val.to_str()
.unwrap_or("")
.split(" ")
.last()
.unwrap_or(""),
None => return Err(ErrorUnauthorized("Unauthorized")),
};
let listener_id = data::get_auth_id(&token).await.map_err(|e| {
eprintln!("TOKEN check failed: {}", e);
ErrorUnauthorized("Unauthorized")
})?;
let mut con = state.redis.get_async_connection().await.map_err(|e| {
eprintln!("Failed to get async connection: {}", e);
ServerError("Internal Server Error")
})?;
con.sadd::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| {
eprintln!("Failed to add author to online list: {}", e);
ServerError("Internal Server Error")
})?;
let chats: Vec<String> = con.smembers::<String, Vec<String>>(format!("chats_by_author/{}", listener_id)).await.map_err(|e| {
eprintln!("Failed to get chats by author: {}", e);
ServerError("Internal Server Error")
})?;
let (tx, mut rx) = broadcast::channel(100);
let state_clone = state.clone();
let handle = tokio::spawn(async move {
let conn = state_clone.redis.get_async_connection().await.unwrap();
let mut pubsub = conn.into_pubsub();
let followers_channel = format!("new_follower:{}", listener_id);
pubsub.subscribe(followers_channel.clone()).await.unwrap();
println!("'{}' subscribed", followers_channel);
pubsub.subscribe("new_shout").await.unwrap();
println!("'new_shout' subscribed");
pubsub.subscribe("new_reaction").await.unwrap();
println!("'new_reaction' subscribed");
for chat_id in &chats {
let channel_name = format!("chat:{}", chat_id);
pubsub.subscribe(&channel_name).await.unwrap();
println!("'{}' subscribed", channel_name);
}
while let Some(msg) = pubsub.on_message().next().await {
let message_str: String = msg.get_payload().unwrap();
let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap();
if msg.get_channel_name().starts_with("chat:") || data::is_fitting(listener_id, message_data.kind.to_string(), message_data.payload).await.is_ok() {
let send_result = tx.send(message_str);
if send_result.is_err() {
let _ = con.srem::<&str, &i32, usize>("authors-online", &listener_id).await.map_err(|e| {
eprintln!("Failed to remove author from online list: {}", e);
ServerError("Internal Server Error")
});
break;
}
};
}
});
state.tasks
.lock()
.unwrap()
.insert(format!("{}", listener_id.clone()), handle);
let server_event = rx.recv().await.map_err(|e| {
eprintln!("Failed to receive server event: {}", e);
ServerError("Internal Server Error")
})?;
let server_event_stream = futures::stream::once(async move { Ok::<_, actix_web::Error>(Bytes::from(server_event)) });
Ok(HttpResponse::Ok()
.append_header(("content-type", "text/event-stream"))
.streaming(server_event_stream))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/"));
let client = redis::Client::open(redis_url.clone()).unwrap();
let tasks = Arc::new(Mutex::new(HashMap::new()));
let state = AppState {
tasks: tasks.clone(),
redis: client.clone(),
};
println!("Starting...");
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.app_data(web::Data::new(state.clone()))
.route("/", web::get().to(connect_handler))
})
.bind("0.0.0.0:8080")?
.run()
.await
}