sse-format

This commit is contained in:
Untone 2023-10-19 17:43:00 +03:00
parent 8473694757
commit 5cf79bf531
5 changed files with 106 additions and 66 deletions

11
Cargo.lock generated
View File

@ -459,12 +459,12 @@ version = "0.2.14"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"futures", "futures",
"rand",
"redis", "redis",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"uuid",
] ]
[[package]] [[package]]
@ -1576,6 +1576,15 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "uuid"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"

View File

@ -13,7 +13,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] } reqwest = { version = "0.11", features = ["json"] }
futures = "0.3.28" futures = "0.3.28"
rand = "0.8.5" uuid = { version = "1.5.0", features = ["v4"] }
[[bin]] [[bin]]
name = "presence" name = "presence"

View File

@ -12,14 +12,23 @@
### Как это работает ### Как это работает
При каждом обращении к `/connect` создаётся отдельная асинхронная задача с подписками на Redus PubSub каналы При каждом обращении к `/connect` создаётся отдельная асинхронная задача с подписками на Redus PubSub каналы, позволяя пользователям получать только те уведомления, которые предназначены непосредственно для них.
- `new_reaction`
- `new_shout` Каналы Redis:
- `followers:<author_id>`
- `reaction`
- `shout`
- `follower:<author_id>`
- `chat:<chat_id>` - `chat:<chat_id>`
После подписки на эти каналы, сервис начинает пересылать сообщения из этих каналов. Он пересылает только те сообщения, которые предназначены пользователю, подписавшемуся на Server-Sent Events (SSE) по адресу `/connect`. Для авторизации подписки используется токен, который передается в заголовке `Authorization`. Сервис пересылает сообщения из этих каналов, которые предназначены пользователю, подписавшемуся на Server-Sent Events (SSE) по адресу `/connect`. Для авторизации подписки используется токен, который передается клиентом в заголовке `Authorization`, или в пути `/connect/{token}`, или в переменной запроса `/connect/?token={token}`.
Таким образом, приложение обеспечивает реализацию механизма подписки и пересылки сообщений, позволяя пользователям получать только те уведомления, которые предназначены непосредственно для них. При завершении подключения, все подписки автоматически отменяются, так как они связаны с конкретным подключением. Если пользователь снова подключается, процесс подписки повторяется.
При завершении подключения, все подписки автоматически отменяются, так как они связаны с конкретным подключением. Если пользователь снова подключается, процесс подписки повторяется.
### Формат сообщений межсервисной коммуникации
Между сервисами пересылаются целые сущности и типизация действий с ними, поля стандартного redis-сообщения:
- `action` наименование операции, примеры: "create" | "delete" | "update" | "join" | "left"
- `payload` json одной из сущностей: Reaction | Shout | Author | Chat | Message

View File

@ -1,10 +1,12 @@
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use reqwest::Client as HTTPClient; use reqwest::Client as HTTPClient;
use serde_json::{json, Value}; use serde_json::json;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
use crate::SSEMessageData;
pub async fn get_auth_id(token: &str) -> Result<i32, Box<dyn Error>> { pub async fn get_auth_id(token: &str) -> Result<i32, Box<dyn Error>> {
let auth_api_base = env::var("AUTH_URL")?; let auth_api_base = env::var("AUTH_URL")?;
let (query_name, query_type) = match auth_api_base.contains("auth.discours.io") { let (query_name, query_type) = match auth_api_base.contains("auth.discours.io") {
@ -105,32 +107,29 @@ async fn get_shout_followers(shout_id: &str) -> Result<Vec<i32>, Box<dyn Error>>
pub async fn is_fitting( pub async fn is_fitting(
listener_id: i32, listener_id: i32,
kind: String, message_data: SSEMessageData,
payload: HashMap<String, Value>,
) -> Result<bool, &'static str> { ) -> Result<bool, &'static str> {
match &kind[0..9] { if message_data.entity == "reaction" {
"new_react" => { // payload is Reaction
// payload is Reaction, kind is new_reaction<reaction_kind> let shout_id = message_data.payload.get("shout").unwrap().as_str().unwrap();
let shout_id = payload.get("shout").unwrap().as_str().unwrap(); let recipients = get_shout_followers(shout_id).await.unwrap();
let recipients = get_shout_followers(shout_id).await.unwrap();
Ok(recipients.contains(&listener_id)) Ok(recipients.contains(&listener_id))
} } else if message_data.entity == "shout" {
"new_shout" => { // payload is Shout
// payload is Shout, kind is "new_shout" // TODO: check all community subscribers if no then
// TODO: check all community subscribers if no then // TODO: check all topics subscribers if no then
// check all topics subscribers if no then // TODO: check all authors subscribers
// check all authors subscribers Ok(true)
Ok(true) } else if message_data.entity == "chat" {
} // payload is Message or Chat
"new_messa" => { Ok(true)
println!("own message passed"); } else if message_data.entity == "follower" {
Ok(false) // payload is Author
}, Ok(true)
_ => { }else {
eprintln!("unknown payload kind"); eprintln!("[data] unknown entity");
eprintln!("{:?}", payload); eprintln!("{:?}", message_data);
Ok(false) Ok(false)
}
} }
} }

View File

@ -5,6 +5,7 @@ use futures::StreamExt;
use redis::{AsyncCommands, Client}; use redis::{AsyncCommands, Client};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use uuid::Uuid;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -21,14 +22,20 @@ struct AppState {
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
struct RedisMessageData { struct RedisMessageData {
payload: HashMap<String, Value>, payload: HashMap<String, Value>,
kind: String, action: String
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SSEMessageData {
payload: HashMap<String, Value>,
action: String,
entity: String
} }
async fn connect_handler( async fn connect_handler(
req: HttpRequest, req: HttpRequest,
state: web::Data<AppState>, state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> { ) -> Result<HttpResponse, actix_web::Error> {
let handler_id: u64 = rand::random();
let token = match req.headers().get("Authorization") { let token = match req.headers().get("Authorization") {
Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""), Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""),
@ -71,13 +78,13 @@ async fn connect_handler(
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let conn = state_clone.redis.get_async_connection().await.unwrap(); let conn = state_clone.redis.get_async_connection().await.unwrap();
let mut pubsub = conn.into_pubsub(); let mut pubsub = conn.into_pubsub();
let followers_channel = format!("followers:{}", listener_id); let followers_channel = format!("follower:{}", listener_id);
pubsub.subscribe(followers_channel.clone()).await.unwrap(); pubsub.subscribe(followers_channel.clone()).await.unwrap();
println!("'{}' subscribed", followers_channel); println!("'{}' pubsub subscribed", followers_channel);
pubsub.subscribe("new_shout").await.unwrap(); pubsub.subscribe("shout").await.unwrap();
println!("'new_shout' subscribed"); println!("'shout' pubsub subscribed");
pubsub.subscribe("new_reaction").await.unwrap(); pubsub.subscribe("reaction").await.unwrap();
println!("'new_reaction' subscribed"); println!("'reaction' pubsub subscribed");
for chat_id in &chats { for chat_id in &chats {
let channel_name = format!("chat:{}", chat_id); let channel_name = format!("chat:{}", chat_id);
@ -86,23 +93,27 @@ async fn connect_handler(
} }
while let Some(msg) = pubsub.on_message().next().await { while let Some(msg) = pubsub.on_message().next().await {
let message_str: String = msg.get_payload().unwrap(); let redis_message_str: String = msg.get_payload().unwrap();
let message_data: RedisMessageData = serde_json::from_str(&message_str).unwrap(); let redis_message_data: RedisMessageData = serde_json::from_str(&redis_message_str).unwrap();
let message_author = message_data.payload.get("author") let prepared_message_data = SSEMessageData {
.and_then(Value::as_i64) // Convert Value to i64 payload: redis_message_data.payload,
.unwrap_or(-1) as i32; // Convert i64 to i32 action: redis_message_data.action,
entity: msg.get_channel_name()
if (msg.get_channel_name().starts_with("chat:") && message_author != listener_id) .to_owned()
|| msg.get_channel_name().starts_with("followers:") .split(":")
|| data::is_fitting( .next()
.unwrap_or("")
.to_string()
};
if data::is_fitting(
listener_id, listener_id,
message_data.kind.to_string(), prepared_message_data.clone(),
message_data.payload,
) )
.await .await
.is_ok() .is_ok()
{ {
let send_result = tx.send(message_str.clone()); let prepared_message_str = serde_json::to_string(&prepared_message_data).unwrap();
let send_result = tx.send(prepared_message_str.clone());
if send_result.is_err() { if send_result.is_err() {
// remove author from online list // remove author from online list
let _ = con let _ = con
@ -114,7 +125,7 @@ async fn connect_handler(
}); });
break; break;
} else { } else {
println!("[handler {}] message handled {}", handler_id, message_str); println!("[handler] message handled {}", prepared_message_str);
} }
}; };
} }
@ -125,16 +136,28 @@ async fn connect_handler(
.unwrap() .unwrap()
.insert(format!("{}", listener_id.clone()), handle); .insert(format!("{}", listener_id.clone()), handle);
let server_event_stream = futures::stream::unfold(rx, |mut rx| async { let server_event_stream = futures::stream::unfold(rx, |mut rx| async {
let result = rx.recv().await; let result = rx.recv().await;
match result { match result {
Ok(server_event) => { Ok(server_event) => {
let formatted_server_event = format!("data: {}\n\n", server_event); let message_data: SSEMessageData = serde_json::from_str(&server_event).unwrap();
Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx)) let event_entity = message_data.entity; // Assuming 'entity' field represents the entity type
}, let event_action = message_data.action; // 'action' field represents the operation type
Err(_) => None, let event_id = format!("{}", Uuid::new_v4()); // Generate a random UUID as the event ID
}
}); let formatted_server_event = format!(
"id: {}\nevent: {}\ndata: {{\"action\": \"{}\", \"payload\": {}}}\n\n",
event_id,
event_entity,
event_action,
server_event
);
Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx))
},
Err(_) => None,
}
});
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.append_header(("content-type", "text/event-stream")) .append_header(("content-type", "text/event-stream"))