quoter-init
Some checks failed
deploy / deploy (push) Failing after 3s

This commit is contained in:
2024-08-30 21:05:51 +03:00
parent d14e5457f3
commit 8e9387b95d
7 changed files with 1334 additions and 411 deletions

View File

@@ -1,194 +0,0 @@
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE};
use reqwest::Client as HTTPClient;
use serde_json::json;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use crate::SSEMessageData;
async fn get_author_id(user: &str) -> Result<i32, Box<dyn Error>> {
let api_base = env::var("API_BASE")?;
let query_name = "get_author_id";
let operation = "GetAuthorId";
let mut headers = HeaderMap::new();
// headers.insert(AUTHORIZATION, HeaderValue::from_str(token)?);
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let mut variables = HashMap::<String, String>::new();
variables.insert("user".to_string(), user.to_string());
let gql = json!({
"query": format!("query {}($user: String!) {{ {}(user: $user){{ id }} }}", operation, query_name),
"operationName": operation,
"variables": variables
});
// println!("[get_author_id] GraphQL: {}", gql);
let client = HTTPClient::new();
let response = client
.post(&api_base)
.headers(headers)
.json(&gql)
.send()
.await?;
if response.status().is_success() {
let r: HashMap<String, serde_json::Value> = response.json().await?;
let author_id = r
.get("data")
.and_then(|data| data.get(query_name))
.and_then(|claims| claims.get("id"))
.and_then(|id| id.as_i64());
match author_id {
Some(id) => {
println!("Author ID retrieved: {}", id);
Ok(id as i32)
}
None => Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"No author ID found in the response",
))),
}
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Request failed with status: {}", response.status()),
)))
}
}
pub async fn get_id_by_token(token: &str) -> Result<i32, Box<dyn Error>> {
let auth_api_base = env::var("AUTH_URL")?;
let query_name = "validate_jwt_token";
let operation = "ValidateToken";
let mut headers = HeaderMap::new();
// headers.insert(AUTHORIZATION, HeaderValue::from_str(token)?);
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let mut variables = HashMap::<String, HashMap<String, String>>::new();
let mut params = HashMap::<String, String>::new();
params.insert("token".to_string(), token.to_string());
params.insert("token_type".to_string(), "access_token".to_string());
variables.insert("params".to_string(), params);
let gql = json!({
"query": format!("query {}($params: ValidateJWTTokenInput!) {{ {}(params: $params) {{ is_valid claims }} }}", operation, query_name),
"operationName": operation,
"variables": variables
});
println!("[get_id_by_token] GraphQL Query: {}", gql);
let client = HTTPClient::new();
let response = client
.post(&auth_api_base)
.headers(headers)
.json(&gql)
.send()
.await?;
if response.status().is_success() {
let r: HashMap<String, serde_json::Value> = response.json().await?;
let user_id = r
.get("data")
.and_then(|data| data.get(query_name))
.and_then(|query| query.get("claims"))
.and_then(|claims| claims.get("sub"))
.and_then(|id| id.as_str())
.map(|id| id.trim());
match user_id {
Some(id) => {
println!("[get_id_by_token] User ID retrieved: {}", id);
let author_id = get_author_id(id).await?;
Ok(author_id as i32)
}
None => {
println!("[get_id_by_token] No user ID found in the response");
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"No user ID found in the response",
)))
}
}
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Request failed with status: {}", response.status()),
)))
}
}
async fn get_shout_followers(shout_id: &str) -> Result<Vec<i32>, Box<dyn Error>> {
let api_base = env::var("API_BASE")?;
let query = r#"query GetShoutFollowers($slug: String, shout_id: Int) {
get_shout_followers(slug: $slug, shout_id: $shout_id) { id }
}
"#;
let shout_id = shout_id.parse::<i32>()?;
let variables = json!({
"shout": shout_id
});
let body = json!({
"query": query,
"operationName": "GetShoutFollowers",
"variables": variables
});
let client = reqwest::Client::new();
let response = client.post(&api_base).json(&body).send().await?;
if response.status().is_success() {
let response_body: serde_json::Value = response.json().await?;
let ids: Vec<i32> = response_body["data"]["get_shout_followers"]
.as_array()
.ok_or("Failed to parse follower array")?
.iter()
.filter_map(|f| f["id"].as_i64().map(|id| id as i32))
.collect();
Ok(ids)
} else {
println!("Request failed with status: {}", response.status());
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"[get_shout_followers] Request failed with status: {}",
response.status()
),
)))
}
}
pub async fn is_fitting(
listener_id: i32,
message_data: SSEMessageData,
) -> Result<bool, &'static str> {
if message_data.entity == "reaction" {
// payload is Reaction
let shout_id = message_data.payload.get("shout").unwrap().as_str().unwrap();
let recipients = get_shout_followers(shout_id).await.unwrap();
Ok(recipients.contains(&listener_id))
} else if message_data.entity == "shout" {
// payload is Shout
// TODO: check all shout.communities subscribers if no then
// TODO: check all shout.topics subscribers if no then
// TODO: check all shout.authors subscribers ???
Ok(true)
} else if message_data.entity == "chat" {
// payload is Chat
Ok(true)
} else if message_data.entity == "message" {
// payload is Message
Ok(true)
} else if message_data.entity == "follower" {
// payload is Author
Ok(true)
} else {
eprintln!("[data] unknown entity");
eprintln!("{:?}", message_data);
Ok(false)
}
}

View File

@@ -1,204 +1,209 @@
use actix_web::error::{ErrorInternalServerError as ServerError, ErrorUnauthorized};
use actix_web::middleware::Logger;
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use futures::StreamExt;
use redis::{AsyncCommands, Client};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use std::collections::HashMap;
use actix_web::{
error::{ErrorInternalServerError, ErrorUnauthorized},
middleware::Logger,
web, App, HttpRequest, HttpResponse, HttpServer, Result,
};
use aws_config::{load_defaults, BehaviorVersion};
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::primitives::ByteStream;
use image::DynamicImage;
use image::imageops::FilterType;
use mime_guess::MimeGuess;
use redis::{aio::MultiplexedConnection, AsyncCommands};
use redis::Client as RedisClient;
use std::env;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use sentry::types::Dsn;
use sentry_actix;
use std::io::Cursor;
use std::path::Path;
mod data;
const MAX_QUOTA_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2 GB per week
#[derive(Clone)]
struct AppState {
tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
redis: Client,
redis: MultiplexedConnection, // Redis connection for managing quotas and file names
s3_client: S3Client, // S3 client for uploading files
s3_bucket: String, // S3 bucket name for storing files
cdn_domain: String, // CDN domain for generating URLs
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct RedisMessageData {
payload: HashMap<String, Value>,
action: String
// Generate a thumbnail for the image
fn generate_thumbnail(image: &DynamicImage) -> Result<Vec<u8>, actix_web::Error> {
let thumbnail = image.resize(320, 320, FilterType::Lanczos3); // Размер миниатюры 320x320
let mut buffer = Vec::new();
thumbnail
.write_to(&mut Cursor::new(&mut buffer), image::ImageFormat::Jpeg)
.map_err(|_| ErrorInternalServerError("Failed to generate thumbnail"))?;
Ok(buffer)
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SSEMessageData {
payload: HashMap<String, Value>,
action: String,
entity: String
// Upload the file to S3 and return the URL
async fn upload_to_s3(
s3_client: &S3Client,
bucket: &str,
key: &str,
body: Vec<u8>,
content_type: &str,
cdn_domain: &str,
) -> Result<String, actix_web::Error> {
let body_stream = ByteStream::from(body);
s3_client.put_object()
.bucket(bucket)
.key(key)
.body(body_stream)
.content_type(content_type)
.send()
.await
.map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?;
Ok(format!("{}/{}", cdn_domain, key))
}
async fn connect_handler(
// Check and update the user's quota
async fn check_and_update_quota(
redis: &mut MultiplexedConnection,
user_id: &str,
file_size: u64,
) -> Result<(), actix_web::Error> {
let current_quota: u64 = redis.get(user_id).await.unwrap_or(0);
if current_quota + file_size > MAX_QUOTA_BYTES {
return Err(ErrorUnauthorized("Quota exceeded"));
}
redis.incr(user_id, file_size).await.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))
}
// Proxy handler for serving static files and uploading them to S3
async fn proxy_handler(
req: HttpRequest,
path: web::Path<String>,
state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let token = req.headers().get("Authorization").and_then(|header_value| header_value.to_str().ok());
let token = match req.headers().get("Authorization") {
Some(val) => val.to_str().unwrap_or("").split(" ").last().unwrap_or(""),
None => match req.match_info().get("token") {
Some(val) => val,
None => match req.query_string().split('=').last() {
Some(val) => val,
None => return Err(ErrorUnauthorized("Unauthorized")),
},
},
};
// Validate token (implementation needed)
if token.is_none() {
return Err(ErrorUnauthorized("Unauthorized"));
}
let listener_id = data::get_id_by_token(&token).await.map_err(|e| {
eprintln!("TOKEN check failed: {}", e);
ErrorUnauthorized("Unauthorized")
})?;
let user_id = token.unwrap(); // Assuming the token is the user ID, adjust as necessary
let mut con = state.redis.get_multiplexed_async_connection().await.map_err(|e| {
eprintln!("Failed to get async connection: {}", e);
ServerError("Internal Server Error")
})?;
// Load the file (implement your file loading logic)
let file_path = path.into_inner();
let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream();
let extension = Path::new(&file_path)
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("bin");
con.sadd::<&str, &i32, usize>("authors-online", &listener_id)
.await
.map_err(|e| {
eprintln!("Failed to add author to online list: {}", e);
ServerError("Internal Server Error")
})?;
// Handle image files: generate thumbnail and upload both
if mime_type.type_() == "image" {
let image = image::open(&file_path).map_err(|_| ErrorInternalServerError("Failed to open image"))?;
let chats: Vec<String> = con
.smembers::<String, Vec<String>>(format!("chats_by_author/{}", listener_id))
.await
.map_err(|e| {
eprintln!("Failed to get chats by author: {}", e);
ServerError("Internal Server Error")
})?;
// Generate thumbnail
let thumbnail_data = generate_thumbnail(&image)?;
let thumbnail_key = format!("thumbnail_{}.{}", file_path, "jpg");
let (tx, rx) = broadcast::channel(100);
let state_clone = state.clone();
let handle = tokio::spawn(async move {
let mut pubsub = state_clone.redis.get_async_pubsub().await.unwrap();
let followers_channel = format!("follower:{}", listener_id);
pubsub.subscribe(followers_channel.clone()).await.unwrap();
println!("'{}' pubsub subscribed", followers_channel);
pubsub.subscribe("shout").await.unwrap();
println!("'shout' pubsub subscribed");
pubsub.subscribe("reaction").await.unwrap();
println!("'reaction' pubsub subscribed");
// Upload the thumbnail
upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&thumbnail_key,
thumbnail_data.clone(),
"image/jpeg",
&state.cdn_domain,
)
.await?;
// chats by member_id
pubsub.subscribe(format!("chat:{}", listener_id)).await.unwrap();
println!("'chat:{}' pubsub subscribed", listener_id);
// Prepare original image data
let mut original_buffer = Vec::new();
image.write_to(&mut Cursor::new(&mut original_buffer), image::ImageFormat::Jpeg)
.map_err(|_| ErrorInternalServerError("Failed to read image data"))?;
// Upload the original image
let image_key = format!("{}.{}", file_path, extension);
let image_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&image_key,
original_buffer.clone(),
mime_type.essence_str(),
&state.cdn_domain,
)
.await?;
// messages by chat_id
for chat_id in &chats {
let channel_name = format!("message:{}", chat_id);
pubsub.subscribe(&channel_name).await.unwrap();
println!("'{}' subscribed", channel_name);
}
// Update quota and save filename
check_and_update_quota(&mut state.redis.clone(), user_id, original_buffer.len() as u64).await?;
save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?;
while let Some(msg) = pubsub.on_message().next().await {
let redis_message_str: String = msg.get_payload().unwrap();
let redis_message_data: RedisMessageData = serde_json::from_str(&redis_message_str).unwrap();
let prepared_message_data = SSEMessageData {
payload: redis_message_data.payload,
action: redis_message_data.action,
entity: msg.get_channel_name()
.to_owned()
.split(":")
.next()
.unwrap_or("")
.to_string()
};
if data::is_fitting(
listener_id,
prepared_message_data.clone(),
)
.await
.is_ok()
{
let prepared_message_str = serde_json::to_string(&prepared_message_data).unwrap();
let send_result = tx.send(prepared_message_str.clone());
if send_result.is_err() {
// remove author from online list
let _ = con
.srem::<&str, &i32, usize>("authors-online", &listener_id)
.await
.map_err(|e| {
eprintln!("Failed to remove author from online list: {}", e);
ServerError("Internal Server Error")
});
break;
} else {
println!("[handler] message handled {}", prepared_message_str);
}
};
}
});
state
.tasks
.lock()
.unwrap()
.insert(format!("{}", listener_id.clone()), handle);
return Ok(HttpResponse::Ok().body(format!("Image and thumbnail uploaded to: {}", image_url)));
}
let server_event_stream = futures::stream::unfold(rx, |mut rx| async {
let result = rx.recv().await;
match result {
Ok(server_event) => {
// Generate a random UUID as the event ID
let event_id = format!("{}", Uuid::new_v4());
// Handle non-image files
let file_data = std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?;
let file_size = file_data.len() as u64;
let formatted_server_event = format!(
"id: {}\ndata: {}\n\n",
event_id,
server_event
);
// Check and update the user's quota
check_and_update_quota(&mut state.redis.clone(), user_id, file_size).await?;
Some((Ok::<_, actix_web::Error>(Bytes::from(formatted_server_event)), rx))
},
Err(_) => None,
}
});
// Upload the file
let file_key = format!("{}.{}", file_path, extension);
let file_url = upload_to_s3(
&state.s3_client,
&state.s3_bucket,
&file_key,
file_data,
mime_type.essence_str(),
&state.cdn_domain,
)
.await?;
Ok(HttpResponse::Ok()
.append_header(("content-type", "text/event-stream"))
.streaming(server_event_stream))
// Save the filename in Redis for this user
save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?;
Ok(HttpResponse::Ok().body(format!("File uploaded to: {}", file_url)))
}
// Save filename in Redis for a specific user
async fn save_filename_in_redis(
redis: &mut MultiplexedConnection,
user_id: &str,
filename: &str,
) -> Result<(), actix_web::Error> {
redis.sadd(user_id, filename).await.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))
}
// Main function to start the server
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1/"));
let client = redis::Client::open(redis_url.clone()).unwrap();
let tasks = Arc::new(Mutex::new(HashMap::new()));
let state = AppState {
tasks: tasks.clone(),
redis: client.clone(),
};
println!("Starting...");
if let Ok(sentry_dsn) = Dsn::from_str(
&env::var("GLITCHTIP_DSN").unwrap_or_default(),
) {
let sentry_options = sentry::ClientOptions {
release: sentry::release_name!(),
..Default::default()
};
let _guard = sentry::init((sentry_dsn, sentry_options));
println!("Sentry initialized...");
} else {
eprintln!("Invalid DSN, sentry was not initialized.");
}
let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set");
let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL");
let redis_connection = redis_client.get_multiplexed_async_connection().await.ok().unwrap();
// Initialize AWS S3 client
let s3_bucket = env::var("S3_BUCKET").expect("S3_BUCKET must be set");
let cdn_domain = env::var("CDN_DOMAIN").expect("CDN_DOMAIN must be set");
let config = load_defaults(BehaviorVersion::latest()).await;
let s3_client = S3Client::new(&config);
// Create application state
let app_state = web::Data::new(AppState {
redis: redis_connection,
s3_client,
s3_bucket,
cdn_domain,
});
// Start HTTP server
HttpServer::new(move || {
App::new()
.wrap(sentry_actix::Sentry::new())
.app_data(app_state.clone())
.wrap(Logger::default())
.app_data(web::Data::new(state.clone()))
.route("/", web::get().to(connect_handler))
.route("/{token}", web::get().to(connect_handler))
.route("/{path:.*}", web::get().to(proxy_handler))
})
.bind("0.0.0.0:8080")?
.bind("127.0.0.1:8080")?
.run()
.await
}