diff --git a/src/main.rs b/src/main.rs index 21fb967..33055cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,12 +4,12 @@ use actix_web::{ web, App, HttpRequest, HttpResponse, HttpServer, Result, }; use aws_config::BehaviorVersion; -use aws_sdk_s3::{config::Credentials, error::SdkError, Client as S3Client}; use aws_sdk_s3::primitives::ByteStream; -use image::{DynamicImage, imageops::FilterType}; +use aws_sdk_s3::{config::Credentials, error::SdkError, Client as S3Client}; +use image::{imageops::FilterType, DynamicImage}; use mime_guess::MimeGuess; -use redis::{aio::MultiplexedConnection, AsyncCommands}; use redis::Client as RedisClient; +use redis::{aio::MultiplexedConnection, AsyncCommands}; use std::env; use std::io::Cursor; use std::path::Path; @@ -21,20 +21,23 @@ struct AppState { redis: MultiplexedConnection, s3_client: S3Client, s3_bucket: String, - cdn_domain: String, + aws_bucket: String, } impl AppState { async fn new() -> Self { let redis_url = env::var("REDIS_URL").expect("REDIS_URL must be set"); let redis_client = RedisClient::open(redis_url).expect("Invalid Redis URL"); - let redis_connection = redis_client.get_multiplexed_async_connection().await.unwrap(); + let redis_connection = redis_client + .get_multiplexed_async_connection() + .await + .unwrap(); let s3_access_key = env::var("STORJ_ACCESS_KEY").expect("STORJ_ACCESS_KEY must be set"); let s3_secret_key = env::var("STORJ_SECRET_KEY").expect("STORJ_SECRET_KEY must be set"); let s3_endpoint = env::var("STORJ_END_POINT").expect("STORJ_END_POINT must be set"); let s3_bucket = env::var("STORJ_BUCKET_NAME").expect("STORJ_BUCKET_NAME must be set"); - let cdn_domain = env::var("CDN_DOMAIN").expect("CDN_DOMAIN must be set"); + let aws_bucket = env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME must be set"); let config = aws_config::defaults(BehaviorVersion::latest()) .region("eu-west-1") @@ -55,7 +58,7 @@ impl AppState { redis: redis_connection, s3_client, s3_bucket, - cdn_domain, + aws_bucket, } } } @@ -77,10 +80,10 @@ async fn upload_to_s3( key: &str, body: Vec, content_type: &str, - cdn_domain: &str, ) -> Result { let body_stream = ByteStream::from(body); - s3_client.put_object() + s3_client + .put_object() .bucket(bucket) .key(key) .body(body_stream) @@ -89,13 +92,19 @@ async fn upload_to_s3( .await .map_err(|_| ErrorInternalServerError("Failed to upload file to S3"))?; - Ok(format!("{}/{}", cdn_domain, key)) + Ok(key.to_string()) } -async fn check_file_exists(s3_client: &S3Client, bucket: &str, key: &str) -> Result { +async fn check_file_exists( + s3_client: &S3Client, + bucket: &str, + key: &str, +) -> Result { match s3_client.head_object().bucket(bucket).key(key).send().await { Ok(_) => Ok(true), - Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => Ok(false), + Err(SdkError::ServiceError(service_error)) if service_error.err().is_not_found() => { + Ok(false) + } Err(e) => Err(ErrorInternalServerError(e.to_string())), } } @@ -109,7 +118,10 @@ async fn check_and_update_quota( if current_quota + file_size > MAX_QUOTA_BYTES { return Err(ErrorUnauthorized("Quota exceeded")); } - redis.incr(user_id, file_size).await.map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; + redis + .incr(user_id, file_size) + .await + .map_err(|_| ErrorInternalServerError("Failed to update quota in Redis"))?; Ok(()) } @@ -118,7 +130,62 @@ async fn save_filename_in_redis( user_id: &str, filename: &str, ) -> Result<(), actix_web::Error> { - redis.sadd(user_id, filename).await.map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; + redis + .sadd(user_id, filename) + .await + .map_err(|_| ErrorInternalServerError("Failed to save filename in Redis"))?; + Ok(()) +} + +async fn upload_files_from_aws( + aws_client: &S3Client, + aws_bucket: &str, + storj_client: &S3Client, + storj_bucket: &str, +) -> Result<(), actix_web::Error> { + let list_objects_v2 = aws_client.list_objects_v2(); + let list_response = list_objects_v2 + .bucket(aws_bucket) + .send() + .await + .map_err(|_| ErrorInternalServerError("Failed to list files from AWS S3"))?; + + if let Some(objects) = list_response.contents { + for object in objects { + if let Some(key) = object.key { + // Get the object from AWS S3 + let object_response = aws_client + .get_object() + .bucket(aws_bucket) + .key(&key) + .send() + .await + .map_err(|_| ErrorInternalServerError("Failed to get object from AWS S3"))?; + + let body = object_response + .body + .collect() + .await + .map_err(|_| ErrorInternalServerError("Failed to read object body"))?; + let content_type = object_response + .content_type + .unwrap_or_else(|| "application/octet-stream".to_string()); + + // Upload the object to Storj S3 + let storj_url = upload_to_s3( + storj_client, + storj_bucket, + &key, + body.into_bytes().to_vec(), + &content_type, + ) + .await?; + + println!("Uploaded {} to Storj at {}", key, storj_url); + } + } + } + Ok(()) } @@ -127,7 +194,10 @@ async fn proxy_handler( path: web::Path, state: web::Data, ) -> Result { - let token = req.headers().get("Authorization").and_then(|header_value| header_value.to_str().ok()); + let token = req + .headers() + .get("Authorization") + .and_then(|header_value| header_value.to_str().ok()); if token.is_none() { return Err(ErrorUnauthorized("Unauthorized")); } @@ -136,10 +206,14 @@ async fn proxy_handler( let file_path = path.into_inner(); let mime_type = MimeGuess::from_path(&file_path).first_or_octet_stream(); - let extension = Path::new(&file_path).extension().and_then(|ext| ext.to_str()).unwrap_or("bin"); + let extension = Path::new(&file_path) + .extension() + .and_then(|ext| ext.to_str()) + .unwrap_or("bin"); if mime_type.type_() == "image" { - let image = image::open(&file_path).map_err(|_| ErrorInternalServerError("Failed to open image"))?; + let image = image::open(&file_path) + .map_err(|_| ErrorInternalServerError("Failed to open image"))?; // Define thumbnail sizes let thumbnail_sizes = vec![40, 110, 300, 600, 800]; @@ -150,28 +224,54 @@ async fn proxy_handler( // Check if thumbnail already exists if !check_file_exists(&state.s3_client, &state.s3_bucket, &thumbnail_key).await? { - upload_to_s3(&state.s3_client, &state.s3_bucket, &thumbnail_key, thumbnail_data, "image/jpeg", &state.cdn_domain).await?; + upload_to_s3( + &state.s3_client, + &state.s3_bucket, + &thumbnail_key, + thumbnail_data, + "image/jpeg", + ) + .await?; } } // Prepare original image data let mut original_buffer = Vec::new(); - image.write_to(&mut Cursor::new(&mut original_buffer), image::ImageFormat::Jpeg) + image + .write_to( + &mut Cursor::new(&mut original_buffer), + image::ImageFormat::Jpeg, + ) .map_err(|_| ErrorInternalServerError("Failed to read image data"))?; // Upload the original image let image_key = format!("{}.{}", file_path, extension); - let image_url = upload_to_s3(&state.s3_client, &state.s3_bucket, &image_key, original_buffer.clone(), mime_type.essence_str(), &state.cdn_domain).await?; + let image_url = upload_to_s3( + &state.s3_client, + &state.s3_bucket, + &image_key, + original_buffer.clone(), + mime_type.essence_str(), + ) + .await?; // Update quota and save filename - check_and_update_quota(&mut state.redis.clone(), user_id, original_buffer.len() as u64).await?; + check_and_update_quota( + &mut state.redis.clone(), + user_id, + original_buffer.len() as u64, + ) + .await?; save_filename_in_redis(&mut state.redis.clone(), user_id, &image_key).await?; - return Ok(HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url))); + return Ok( + HttpResponse::Ok().body(format!("Image and thumbnails uploaded to: {}", image_url)) + ); } // Handle non-image files - let file_data = std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?; + let file_data = + std::fs::read(&file_path).map_err(|_| ErrorInternalServerError("Failed to read file"))?; let file_size = file_data.len() as u64; // Check and update the user's quota @@ -179,7 +279,14 @@ async fn proxy_handler( // Upload the file let file_key = format!("{}.{}", file_path, extension); - let file_url = upload_to_s3(&state.s3_client, &state.s3_bucket, &file_key, file_data, mime_type.essence_str(), &state.cdn_domain).await?; + let file_url = upload_to_s3( + &state.s3_client, + &state.s3_bucket, + &file_key, + file_data, + mime_type.essence_str(), + ) + .await?; // Save the filename in Redis for this user save_filename_in_redis(&mut state.redis.clone(), user_id, &file_key).await?; @@ -190,6 +297,16 @@ async fn proxy_handler( async fn main() -> std::io::Result<()> { let app_state = AppState::new().await; + // Example of uploading files from AWS S3 to Storj + upload_files_from_aws( + &app_state.s3_client, + &app_state.aws_bucket, + &app_state.s3_client, + &app_state.s3_bucket, + ) + .await + .expect("Failed to upload files from AWS to Storj"); + HttpServer::new(move || { App::new() .app_data(web::Data::new(app_state.clone())) @@ -199,4 +316,4 @@ async fn main() -> std::io::Result<()> { .bind("127.0.0.1:8080")? .run() .await -} \ No newline at end of file +}