diff --git a/Cargo.lock b/Cargo.lock index c53f74e..9a90fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,12 +92,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "anyhow" -version = "1.0.72" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" - [[package]] name = "arc-swap" version = "1.6.0" @@ -992,6 +986,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "md5" version = "0.7.0" @@ -1522,30 +1526,35 @@ dependencies = [ name = "rustus" version = "0.1.0" dependencies = [ - "anyhow", "async-trait", "axum", "base64 0.21.5", "bytes", "chrono", "clap", + "digest", "enum_dispatch", "fern", "futures", "log", + "md-5", "mimalloc", "mime", "mime_guess", "mobc", "redis", + "reqwest", "rust-s3", "rustc-hash", "serde", "serde_json", + "sha1", + "sha2", "strum", "thiserror", "tokio", "tokio-util", + "tower", "uuid", ] @@ -1669,6 +1678,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index d0d5091..d0b1d19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.72" async-trait = "0.1.74" axum = { git = "https://github.com/tokio-rs/axum.git", rev = "3ff45d9" } base64 = "0.21.5" @@ -35,3 +34,10 @@ tokio = { version = "1.31.0", features = ["full"] } tokio-util = { version = "0.7.10", features = ["io"] } uuid = { version = "1.5.0", features = ["v4"] } rust-s3 = "^0.33" +tower = "0.4.13" +# Hashsums +sha1 = { version = "^0.10.1", features = ["compress"] } +sha2 = { version = "^0.10.1", features = ["compress"] } +md-5 = "^0.10.1" +digest = "^0.10.1" +reqwest = "0.11.22" diff --git a/src/config.rs b/src/config.rs index 0d4e1c8..ab09c00 100644 --- a/src/config.rs +++ b/src/config.rs @@ -82,7 +82,7 @@ impl Config { } } - pub fn get_url(&self, url: String) -> String { + pub fn get_url(&self, url: &str) -> String { format!("{}/{url}", self.url) } } diff --git a/src/data_storage/base.rs b/src/data_storage/base.rs index 4e41643..0bdd3e3 100644 --- a/src/data_storage/base.rs +++ b/src/data_storage/base.rs @@ -18,7 +18,7 @@ pub trait Storage { /// It MUST throw errors if connection can't /// be established or in any other case that might /// be a problem later on. - async fn prepare(&mut self) -> anyhow::Result<()>; + async fn prepare(&mut self) -> RustusResult<()>; /// Get contents of a file. /// @@ -48,7 +48,7 @@ pub trait Storage { /// # Params /// `file_info` - info about current file. /// `bytes` - bytes to append to the file. - async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> anyhow::Result<()>; + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()>; /// Create file in storage. /// @@ -58,7 +58,7 @@ pub trait Storage { /// /// # Params /// `file_info` - info about current file. - async fn create_file(&self, file_info: &FileInfo) -> anyhow::Result; + async fn create_file(&self, file_info: &FileInfo) -> RustusResult; /// Concatenate files. /// @@ -73,7 +73,7 @@ pub trait Storage { &self, file_info: &FileInfo, parts_info: Vec, - ) -> anyhow::Result<()>; + ) -> RustusResult<()>; /// Remove file from storage /// @@ -82,5 +82,5 @@ pub trait Storage { /// /// # Params /// `file_info` - info about current file. - async fn remove_file(&self, file_info: &FileInfo) -> anyhow::Result<()>; + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()>; } diff --git a/src/data_storage/file_storage.rs b/src/data_storage/file_storage.rs index 76e00f7..ea43888 100644 --- a/src/data_storage/file_storage.rs +++ b/src/data_storage/file_storage.rs @@ -32,7 +32,7 @@ impl FileStorage { } } - pub fn data_file_path(&self, file_id: &str) -> anyhow::Result { + pub fn data_file_path(&self, file_id: &str) -> RustusResult { let dir = self .data_dir // We're working wit absolute paths, because tus.io says so. @@ -48,7 +48,7 @@ impl Storage for FileStorage { "file" } - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { // We're creating directory for new files // if it doesn't already exist. if !self.data_dir.exists() { @@ -78,7 +78,7 @@ impl Storage for FileStorage { Ok((headers, body).into_response()) } - async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> anyhow::Result<()> { + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { // In normal situation this `if` statement is not // gonna be called, but what if it is ... if file_info.path.is_none() { @@ -108,7 +108,7 @@ impl Storage for FileStorage { .await? } - async fn create_file(&self, file_info: &FileInfo) -> anyhow::Result { + async fn create_file(&self, file_info: &FileInfo) -> RustusResult { // New path to file. let file_path = self.data_file_path(file_info.id.as_str())?; tokio::task::spawn_blocking(move || { @@ -128,7 +128,7 @@ impl Storage for FileStorage { &self, file_info: &FileInfo, parts_info: Vec, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { let force_fsync = self.force_fsync; let path = file_info.path.as_ref().unwrap().clone(); tokio::task::spawn_blocking(move || { @@ -157,7 +157,7 @@ impl Storage for FileStorage { .await? } - async fn remove_file(&self, file_info: &FileInfo) -> anyhow::Result<()> { + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { let info = file_info.clone(); tokio::task::spawn_blocking(move || { // Let's remove the file itself. diff --git a/src/data_storage/mod.rs b/src/data_storage/mod.rs index bf49025..dee15b0 100644 --- a/src/data_storage/mod.rs +++ b/src/data_storage/mod.rs @@ -1,4 +1,4 @@ -use crate::config::Config; +use crate::{config::Config, errors::RustusResult}; pub mod base; pub mod file_storage; @@ -11,7 +11,7 @@ pub enum DataStorageImpl { } impl DataStorageImpl { - pub fn new(_config: &Config) -> anyhow::Result { + pub fn new(_config: &Config) -> RustusResult { Ok(Self::File(file_storage::FileStorage::new( "./data".into(), "{year}/{month}/{day}/".into(), @@ -28,7 +28,7 @@ impl base::Storage for DataStorageImpl { } } - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { match self { Self::File(file) => file.prepare().await, Self::S3Hybrid(s3) => s3.prepare().await, @@ -50,7 +50,7 @@ impl base::Storage for DataStorageImpl { &self, file_info: &crate::models::file_info::FileInfo, bytes: bytes::Bytes, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { match self { Self::File(file) => file.add_bytes(file_info, bytes).await, Self::S3Hybrid(s3) => s3.add_bytes(file_info, bytes).await, @@ -60,7 +60,7 @@ impl base::Storage for DataStorageImpl { async fn create_file( &self, file_info: &crate::models::file_info::FileInfo, - ) -> anyhow::Result { + ) -> RustusResult { match self { Self::File(file) => file.create_file(file_info).await, Self::S3Hybrid(s3) => s3.create_file(file_info).await, @@ -71,7 +71,7 @@ impl base::Storage for DataStorageImpl { &self, file_info: &crate::models::file_info::FileInfo, parts_info: Vec, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { match self { Self::File(file) => file.concat_files(file_info, parts_info).await, Self::S3Hybrid(s3) => s3.concat_files(file_info, parts_info).await, @@ -81,7 +81,7 @@ impl base::Storage for DataStorageImpl { async fn remove_file( &self, file_info: &crate::models::file_info::FileInfo, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { match self { Self::File(file) => file.remove_file(file_info).await, Self::S3Hybrid(s3) => s3.remove_file(file_info).await, diff --git a/src/data_storage/s3_hybrid.rs b/src/data_storage/s3_hybrid.rs index 6d76a1d..484d818 100644 --- a/src/data_storage/s3_hybrid.rs +++ b/src/data_storage/s3_hybrid.rs @@ -99,7 +99,7 @@ impl S3HybridStorage { /// /// This function is called to upload file to s3 completely. /// It streams file directly from disk to s3. - async fn upload_file(&self, file_info: &FileInfo) -> anyhow::Result<()> { + async fn upload_file(&self, file_info: &FileInfo) -> RustusResult<()> { if file_info.path.is_none() { return Err(RustusError::UnableToWrite("Cannot get upload path.".into()).into()); } @@ -128,7 +128,7 @@ impl Storage for S3HybridStorage { "s3_hybrid" } - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { Ok(()) } @@ -150,7 +150,7 @@ impl Storage for S3HybridStorage { Ok(([disp.0, disp.1], body).into_response()) } - async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> anyhow::Result<()> { + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { let part_len = bytes.len(); self.local_storage.add_bytes(file_info, bytes).await?; // If upload is complete. Upload the resulting file onto S3. @@ -161,7 +161,7 @@ impl Storage for S3HybridStorage { Ok(()) } - async fn create_file(&self, file_info: &FileInfo) -> anyhow::Result { + async fn create_file(&self, file_info: &FileInfo) -> RustusResult { self.local_storage.create_file(file_info).await } @@ -169,11 +169,11 @@ impl Storage for S3HybridStorage { &self, _file_info: &FileInfo, _parts_info: Vec, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { Err(RustusError::Unimplemented("Hybrid s3 cannot concat files.".into()).into()) } - async fn remove_file(&self, file_info: &FileInfo) -> anyhow::Result<()> { + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { if Some(file_info.offset) == file_info.length { self.bucket .delete_object(self.get_s3_key(file_info)) diff --git a/src/errors.rs b/src/errors.rs index 50c28a8..5419cfe 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,23 +1,125 @@ +use std::io::{Error, ErrorKind}; + use axum::response::IntoResponse; +use log::error; + +use axum::http::StatusCode; pub type RustusResult = Result; -#[derive(Debug, thiserror::Error)] +#[derive(thiserror::Error, Debug)] pub enum RustusError { - #[error("Unable to prepare info storage: {0}")] - UnableToRemove(String), - #[error("Cannot write: {0}")] - UnableToWrite(String), - #[error("File not found.")] - FileNotFound, - #[error("Something really bad happened: {0}")] - AnyHowError(#[from] anyhow::Error), - #[error("Unimplemented: {0}")] + #[error("{0}")] Unimplemented(String), + #[error("Not found")] + FileNotFound, + #[error("File already exists")] + FileAlreadyExists, + #[error("Given offset is incorrect.")] + WrongOffset, + #[error("Unknown error")] + Unknown, + #[error("File is frozen")] + FrozenFile, + #[error("Size already known")] + SizeAlreadyKnown, + #[error("Unable to serialize object")] + UnableToSerialize(#[from] serde_json::Error), + #[error("Redis error: {0}")] + RedisError(#[from] redis::RedisError), + #[error("Redis pooling error: {0}")] + MobcError(#[from] mobc::Error), + #[error("Unable to get file information")] + UnableToReadInfo, + #[error("Unable to write file {0}")] + UnableToWrite(String), + #[error("Unable to remove file {0}")] + UnableToRemove(String), + #[error("Unable to prepare info storage. Reason: {0}")] + UnableToPrepareInfoStorage(String), + #[error("Unable to prepare storage. Reason: {0}")] + UnableToPrepareStorage(String), + #[error("Unknown extension: {0}")] + UnknownExtension(String), + #[error("Http request failed: {0}")] + HttpRequestError(#[from] reqwest::Error), + #[error("Hook invocation failed. Reason: {0}")] + HookError(String), + #[error("Unable to configure logging: {0}")] + LogConfigError(#[from] log::SetLoggerError), + #[cfg(feature = "amqp_notifier")] + #[error("AMQP error: {0}")] + AMQPError(#[from] lapin::Error), + #[cfg(feature = "amqp_notifier")] + #[error("AMQP pooling error error: {0}")] + AMQPPoolError(#[from] bb8::RunError), + #[error("Std error: {0}")] + StdError(#[from] std::io::Error), + #[error("Can't spawn task: {0}")] + TokioSpawnError(#[from] tokio::task::JoinError), + #[error("Unknown hashsum algorithm")] + UnknownHashAlgorithm, + #[error("Wrong checksum")] + WrongChecksum, + #[error("The header value is incorrect")] + WrongHeaderValue, + // #[error("Metrics error: {0}")] + // PrometheusError(#[from] prometheus::Error), + #[error("HTTP hook error. Returned status: {0}, Response text: {1}")] + HTTPHookError(u16, String, Option), + #[error("Found S3 error: {0}")] + S3Error(#[from] s3::error::S3Error), + #[error("Missing offset header")] + MissingOffset, +} + +/// This conversion allows us to use `RustusError` in the `main` function. +#[cfg_attr(coverage, no_coverage)] +impl From for Error { + fn from(err: RustusError) -> Self { + Error::new(ErrorKind::Other, err) + } +} + +impl RustusError { + fn get_status_code(&self) -> StatusCode { + match self { + RustusError::FileNotFound => StatusCode::NOT_FOUND, + RustusError::WrongOffset => StatusCode::CONFLICT, + RustusError::FrozenFile + | RustusError::SizeAlreadyKnown + | RustusError::HookError(_) + | RustusError::UnknownHashAlgorithm + | RustusError::WrongHeaderValue => StatusCode::BAD_REQUEST, + RustusError::WrongChecksum => StatusCode::EXPECTATION_FAILED, + RustusError::HTTPHookError(status, _, _) => { + StatusCode::from_u16(*status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } } impl IntoResponse for RustusError { fn into_response(self) -> axum::response::Response { - axum::response::IntoResponse::into_response(self.to_string()) + log::error!("{self}"); + let status_code = self.get_status_code(); + match self { + RustusError::HTTPHookError(_, proxy_response, content_type) => { + axum::response::IntoResponse::into_response(( + status_code, + [( + "Content-Type", + content_type.unwrap_or("text/plain; charset=utf-8".into()), + )], + proxy_response, + )) + } + _ => axum::response::IntoResponse::into_response(( + status_code, + [("Content-Type", "text/html; charset=utf-8")], + format!("{self}"), + )), + } } } diff --git a/src/info_storages/base.rs b/src/info_storages/base.rs index 236551b..07ee5df 100644 --- a/src/info_storages/base.rs +++ b/src/info_storages/base.rs @@ -1,4 +1,4 @@ -use crate::models::file_info::FileInfo; +use crate::{models::file_info::FileInfo, errors::RustusResult}; /// Trait for every info storage. /// /// This trait defines required functions @@ -9,7 +9,7 @@ pub trait InfoStorage { /// In this function you can prepare /// you info storage. E.G. create a table in a database, /// or a directory somewhere. - async fn prepare(&mut self) -> anyhow::Result<()>; + async fn prepare(&mut self) -> RustusResult<()>; /// Set information about an upload. /// @@ -22,17 +22,17 @@ pub trait InfoStorage { /// about a file and actually store it. To bypass it /// we can guarantee that this parameter will never be `true` /// for any update operation. - async fn set_info(&self, file_info: &FileInfo, create: bool) -> anyhow::Result<()>; + async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()>; /// Retrieve information from storage. /// /// This function must return information about file /// from the given storage. - async fn get_info(&self, file_id: &str) -> anyhow::Result; + async fn get_info(&self, file_id: &str) -> RustusResult; /// This function removes information about file completely. /// /// This function must actually delete any stored information /// associated with the given `file_id`. - async fn remove_info(&self, file_id: &str) -> anyhow::Result<()>; + async fn remove_info(&self, file_id: &str) -> RustusResult<()>; } diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index 014c42c..1d67a6e 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use tokio::fs::DirBuilder; -use crate::models::file_info::FileInfo; +use crate::{models::file_info::FileInfo, errors::RustusResult}; use super::base::InfoStorage; @@ -22,14 +22,14 @@ impl FileInfoStorage { } impl InfoStorage for FileInfoStorage { - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { if !self.info_dir.exists() { DirBuilder::new().create(self.info_dir.as_path()).await?; } Ok(()) } - async fn set_info(&self, file_info: &FileInfo, create: bool) -> anyhow::Result<()> { + async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> { let info = file_info.clone(); let path = self.info_file_path(info.id.as_str()); let file = tokio::fs::OpenOptions::new() @@ -44,7 +44,7 @@ impl InfoStorage for FileInfoStorage { Ok(()) } - async fn get_info(&self, file_id: &str) -> anyhow::Result { + async fn get_info(&self, file_id: &str) -> RustusResult { let info_path = self.info_file_path(file_id); let file = tokio::fs::File::open(info_path).await?; let mut reader = tokio::io::BufReader::new(file); @@ -53,7 +53,7 @@ impl InfoStorage for FileInfoStorage { Ok(serde_json::from_slice::(contents.as_slice())?) } - async fn remove_info(&self, file_id: &str) -> anyhow::Result<()> { + async fn remove_info(&self, file_id: &str) -> RustusResult<()> { let id = String::from(file_id); let info_path = self.info_file_path(id.as_str()); tokio::fs::remove_file(info_path).await?; diff --git a/src/info_storages/mod.rs b/src/info_storages/mod.rs index c5ce1ac..4cfc14b 100644 --- a/src/info_storages/mod.rs +++ b/src/info_storages/mod.rs @@ -1,4 +1,4 @@ -use crate::{config::Config, from_str}; +use crate::{config::Config, errors::RustusResult, from_str}; pub mod base; pub mod file_info_storage; @@ -23,15 +23,15 @@ pub enum InfoStorageImpl { } impl InfoStorageImpl { - pub async fn new(_config: &Config) -> anyhow::Result { - Ok(Self::Redis( - redis_info_storage::RedisStorage::new("redis://localhost", None).await?, - )) + pub async fn new(_config: &Config) -> RustusResult { + Ok(Self::File(file_info_storage::FileInfoStorage::new( + "./data".into(), + ))) } } impl base::InfoStorage for InfoStorageImpl { - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { match self { Self::Redis(redis) => redis.prepare().await, Self::File(file) => file.prepare().await, @@ -42,21 +42,21 @@ impl base::InfoStorage for InfoStorageImpl { &self, file_info: &crate::models::file_info::FileInfo, create: bool, - ) -> anyhow::Result<()> { + ) -> RustusResult<()> { match self { Self::Redis(redis) => redis.set_info(file_info, create).await, Self::File(file) => file.set_info(file_info, create).await, } } - async fn get_info(&self, file_id: &str) -> anyhow::Result { + async fn get_info(&self, file_id: &str) -> RustusResult { match self { Self::Redis(redis) => redis.get_info(file_id).await, Self::File(file) => file.get_info(file_id).await, } } - async fn remove_info(&self, file_id: &str) -> anyhow::Result<()> { + async fn remove_info(&self, file_id: &str) -> RustusResult<()> { match self { Self::Redis(redis) => redis.remove_info(file_id).await, Self::File(file) => file.remove_info(file_id).await, diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs index 8ccd8b1..4a2f89f 100644 --- a/src/info_storages/redis_info_storage.rs +++ b/src/info_storages/redis_info_storage.rs @@ -1,7 +1,7 @@ use mobc::{Manager, Pool}; use redis::aio::Connection; -use crate::{errors::RustusError, models::file_info::FileInfo}; +use crate::{errors::{RustusError, RustusResult}, models::file_info::FileInfo}; use super::base::InfoStorage; @@ -41,7 +41,7 @@ pub struct RedisStorage { impl RedisStorage { #[allow(clippy::unused_async)] - pub async fn new(db_dsn: &str, expiration: Option) -> anyhow::Result { + pub async fn new(db_dsn: &str, expiration: Option) -> RustusResult { let client = redis::Client::open(db_dsn)?; let manager = RedisConnectionManager::new(client); let pool = mobc::Pool::builder().max_open(100).build(manager); @@ -50,11 +50,11 @@ impl RedisStorage { } impl InfoStorage for RedisStorage { - async fn prepare(&mut self) -> anyhow::Result<()> { + async fn prepare(&mut self) -> RustusResult<()> { Ok(()) } - async fn set_info(&self, file_info: &FileInfo, _create: bool) -> anyhow::Result<()> { + async fn set_info(&self, file_info: &FileInfo, _create: bool) -> RustusResult<()> { let mut conn = self.pool.get().await?; let mut cmd = redis::cmd("SET"); let mut cmd = cmd @@ -67,7 +67,7 @@ impl InfoStorage for RedisStorage { Ok(()) } - async fn get_info(&self, file_id: &str) -> anyhow::Result { + async fn get_info(&self, file_id: &str) -> RustusResult { let mut conn = self.pool.get().await?; let res = redis::cmd("GET") .arg(file_id) @@ -79,7 +79,7 @@ impl InfoStorage for RedisStorage { FileInfo::from_json(res.unwrap()) } - async fn remove_info(&self, file_id: &str) -> anyhow::Result<()> { + async fn remove_info(&self, file_id: &str) -> RustusResult<()> { let mut conn = self.pool.get().await?; let resp = redis::cmd("DEL") .arg(file_id) diff --git a/src/main.rs b/src/main.rs index f0cd3bf..f2e1d44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![allow(async_fn_in_trait)] +use errors::RustusResult; use fern::{ colors::{Color, ColoredLevelConfig}, Dispatch, @@ -22,7 +23,7 @@ pub mod utils; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[cfg_attr(coverage, no_coverage)] -fn setup_logging(app_config: &Config) -> anyhow::Result<()> { +fn setup_logging(app_config: &Config) -> RustusResult<()> { let colors = ColoredLevelConfig::new() // use builder methods .info(Color::Green) @@ -47,7 +48,7 @@ fn setup_logging(app_config: &Config) -> anyhow::Result<()> { Ok(()) } -fn main() -> anyhow::Result<()> { +fn main() -> RustusResult<()> { let args = Config::parse(); setup_logging(&args)?; tokio::runtime::Builder::new_multi_thread() diff --git a/src/models/file_info.rs b/src/models/file_info.rs index cec93f2..e3d42a9 100644 --- a/src/models/file_info.rs +++ b/src/models/file_info.rs @@ -3,6 +3,8 @@ use chrono::{serde::ts_seconds, DateTime, Utc}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; +use crate::errors::RustusResult; + /// Information about file. /// It has everything about stored file. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -91,12 +93,12 @@ impl FileInfo { self.metadata.get("filename").unwrap_or(&self.id) } - pub fn json(&self) -> anyhow::Result { + pub fn json(&self) -> RustusResult { let info_clone = self.clone(); Ok(serde_json::to_string(&info_clone)?) } - pub fn from_json(data: String) -> anyhow::Result { + pub fn from_json(data: String) -> RustusResult { Ok(serde_json::from_str::(data.as_str())?) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 10d7f06..b783f2b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,11 +1,13 @@ -use crate::{config::Config, state::RustusState}; +use crate::{config::Config, state::RustusState, utils::headers::HeaderMapExt, errors::RustusResult}; +use axum::{extract::State, http::HeaderValue, ServiceExt}; +use tower::Layer; mod routes; async fn logger( req: axum::extract::Request, next: axum::middleware::Next, -) -> Result { +) -> impl axum::response::IntoResponse { let method = req.method().to_string(); let uri = req .uri() @@ -17,28 +19,87 @@ async fn logger( let res = next.run(req).await; let elapsed = time.elapsed().as_micros(); let status = res.status().as_u16(); - log::info!("{method} {uri} {status} {elapsed}"); - Ok(res) + + // log::log!(log::Level::Info, "ememe"); + if uri != "/health" { + let mut level = log::Level::Info; + if res.status().is_server_error() { + level = log::Level::Error; + log::error!("{:#?}", res.body()); + } + log::log!(level, "{method} {uri} {status} {elapsed}"); + } + + res +} + +async fn method_replacer( + mut req: axum::extract::Request, + next: axum::middleware::Next, +) -> impl axum::response::IntoResponse { + if let Some(new_method) = req.headers().get_method_override() { + *req.method_mut() = new_method; + req.headers_mut().remove("X-HTTP-Method-Override"); + } + next.run(req).await +} + +async fn add_tus_header( + State(state): State, + req: axum::extract::Request, + next: axum::middleware::Next, +) -> impl axum::response::IntoResponse { + let mut resp = next.run(req).await; + resp.headers_mut() + .insert("Tus-Resumable", HeaderValue::from_static("1.0.0")); + resp.headers_mut() + .insert("Tus-Version", HeaderValue::from_static("1.0.0")); + + let max_file_size = state + .max_file_size + .map(|val| val.to_string()) + .and_then(|val| HeaderValue::from_str(val.as_str()).ok()); + + if let Some(max_size) = max_file_size { + resp.headers_mut().insert("Tus-Max-Size", max_size); + } + + return resp; +} + +async fn healthcheck() -> impl axum::response::IntoResponse { + axum::http::StatusCode::OK } async fn fallback() -> impl axum::response::IntoResponse { (axum::http::StatusCode::NOT_FOUND, "Not found") } -pub async fn start_server(config: Config) -> anyhow::Result<()> { +pub async fn start_server(config: Config) -> RustusResult<()> { let state = RustusState::from_config(&config).await?; let app = axum::Router::new() .route("/", axum::routing::post(routes::create::create_route)) - .with_state(state); + .route("/", axum::routing::options(routes::info::info_route)) + .route( + "/:upload_id", + axum::routing::patch(routes::upload::upload_chunk_route), + ) + .with_state(state) + .route_layer(axum::middleware::from_fn_with_state( + config.clone(), + add_tus_header, + )); let listener = tokio::net::TcpListener::bind((config.host.clone(), config.port)).await?; - println!("Starting server at http://{}:{}", config.host, config.port); - axum::serve( - listener, - axum::Router::new() - .nest(&config.url, app) - .fallback(fallback) - .layer(axum::middleware::from_fn(logger)), - ) - .await?; + log::info!("Starting server at http://{}:{}", config.host, config.port); + + let main_router = axum::Router::new() + .route("/health", axum::routing::get(healthcheck)) + .nest(&config.url, app) + .fallback(fallback); + + let service = axum::middleware::from_fn(method_replacer) + .layer(axum::middleware::from_fn(logger).layer(main_router)); + + axum::serve(listener, service.into_make_service()).await?; Ok(()) } diff --git a/src/server/routes/create.rs b/src/server/routes/create.rs index 03078f9..c263cc2 100644 --- a/src/server/routes/create.rs +++ b/src/server/routes/create.rs @@ -6,8 +6,9 @@ use axum::{ use bytes::Bytes; use crate::{ - errors::RustusResult, extensions::TusExtensions, info_storages::base::InfoStorage, - models::file_info::FileInfo, state::RustusState, utils::headers::HeaderMapExt, + data_storage::base::Storage, errors::RustusResult, extensions::TusExtensions, + info_storages::base::InfoStorage, models::file_info::FileInfo, state::RustusState, + utils::headers::HeaderMapExt, }; pub async fn create_route( @@ -78,7 +79,53 @@ pub async fn create_route( } } + file_info.path = Some(state.data_storage.create_file(&file_info).await?); + + if file_info.is_final { + let mut final_size = 0; + let mut parts_info = Vec::new(); + for part_id in file_info.clone().parts.unwrap() { + let part = state.info_storage.get_info(part_id.as_str()).await?; + if part.length != Some(part.offset) { + return Ok(( + StatusCode::BAD_REQUEST, + format!("{} upload is not complete.", part.id), + ) + .into_response()); + } + if !part.is_partial { + return Ok(( + StatusCode::BAD_REQUEST, + format!("{} upload is not partial.", part.id), + ) + .into_response()); + } + final_size += &part.length.unwrap(); + parts_info.push(part.clone()); + } + state + .data_storage + .concat_files(&file_info, parts_info.clone()) + .await?; + file_info.offset = final_size; + file_info.length = Some(final_size); + if state.config.remove_parts { + for part in parts_info { + state.data_storage.remove_file(&part).await?; + state.info_storage.remove_info(part.id.as_str()).await?; + } + } + } + state.info_storage.set_info(&file_info, true).await?; + let upload_url = state.config.get_url(&file_info.id); - Ok(().into_response()) + Ok(( + StatusCode::CREATED, + [ + ("Location", upload_url.as_str()), + ("Upload-Offset", file_info.offset.to_string().as_str()), + ], + ) + .into_response()) } diff --git a/src/server/routes/info.rs b/src/server/routes/info.rs new file mode 100644 index 0000000..56eaec6 --- /dev/null +++ b/src/server/routes/info.rs @@ -0,0 +1,15 @@ +use axum::{extract::State, http::StatusCode}; + +use crate::state::RustusState; + +pub async fn info_route(State(ref state): State) -> impl axum::response::IntoResponse { + let extensions = state + .config + .tus_extensions + .iter() + .map(|ext| ext.to_string()) + .collect::>() + .join(","); + + (StatusCode::NO_CONTENT, [("Tus-Extension", extensions)]) +} diff --git a/src/server/routes/mod.rs b/src/server/routes/mod.rs index c5fb369..6ea3727 100644 --- a/src/server/routes/mod.rs +++ b/src/server/routes/mod.rs @@ -1 +1,3 @@ pub mod create; +pub mod info; +pub mod upload; diff --git a/src/server/routes/upload.rs b/src/server/routes/upload.rs new file mode 100644 index 0000000..6d6679b --- /dev/null +++ b/src/server/routes/upload.rs @@ -0,0 +1,26 @@ +use axum::{ + extract::Path, + http::{HeaderMap, StatusCode}, + response::IntoResponse, +}; + +use crate::{errors::{RustusResult, RustusError}, utils::headers::HeaderMapExt}; + +pub async fn upload_chunk_route( + Path(upload_id): Path, + headers: HeaderMap, +) -> RustusResult { + println!("hehehe {}", upload_id); + if !headers.check("Content-Type", |val| { + val == "application/offset+octet-stream" + }) { + return Ok((StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported media type").into_response()); + } + + let offset: Option = headers.parse("Upload-Offset"); + if offset.is_none() { + return Err(RustusError::MissingOffset); + } + + Ok("upload_chunk_route".into_response()) +} diff --git a/src/state.rs b/src/state.rs index d4dddca..d334b06 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,4 +1,9 @@ -use crate::{config::Config, data_storage::DataStorageImpl, info_storages::InfoStorageImpl}; +use crate::{ + config::Config, + data_storage::{base::Storage, DataStorageImpl}, + errors::RustusResult, + info_storages::{base::InfoStorage, InfoStorageImpl}, +}; #[derive(Clone)] pub struct RustusState { @@ -8,9 +13,12 @@ pub struct RustusState { } impl RustusState { - pub async fn from_config(config: &Config) -> anyhow::Result { - let info_storage = InfoStorageImpl::new(config).await?; - let data_storage = DataStorageImpl::new(config)?; + pub async fn from_config(config: &Config) -> RustusResult { + let mut info_storage = InfoStorageImpl::new(config).await?; + let mut data_storage = DataStorageImpl::new(config)?; + info_storage.prepare().await?; + data_storage.prepare().await?; + Ok(Self { config: config.clone(), info_storage, diff --git a/src/utils/hashes.rs b/src/utils/hashes.rs new file mode 100644 index 0000000..826ea18 --- /dev/null +++ b/src/utils/hashes.rs @@ -0,0 +1,131 @@ +use crate::errors::{RustusError, RustusResult}; +use axum::http::HeaderValue; +use base64::Engine; +use digest::Digest; + +/// Checks if hash-sum of a slice matches the given checksum. +fn checksum_verify(algo: &str, bytes: &[u8], checksum: &[u8]) -> RustusResult { + match algo { + "sha1" => { + let sum = sha1::Sha1::digest(bytes); + Ok(sum.as_slice() == checksum) + } + "sha256" => { + let sum = sha2::Sha256::digest(bytes); + Ok(sum.as_slice() == checksum) + } + "sha512" => { + let sum = sha2::Sha512::digest(bytes); + Ok(sum.as_slice() == checksum) + } + "md5" => { + let sum = md5::Md5::digest(bytes); + Ok(sum.as_slice() == checksum) + } + _ => Err(RustusError::UnknownHashAlgorithm), + } +} + +/// Verify checksum of a given chunk based on header's value. +/// +/// This function decodes given header value. +/// Format of the header is: +/// +/// +/// It tries decode header value to string, +/// splits it in two parts and after decoding base64 checksum +/// verifies it. +/// +/// # Errors +/// +/// It may return error if header value can't be represented as string, +/// if checksum can't be decoded with base64 or if unknown algorithm is used. +pub fn verify_chunk_checksum(header: &HeaderValue, data: &[u8]) -> RustusResult { + if let Ok(val) = header.to_str() { + let mut split = val.split(' '); + if let Some(algo) = split.next() { + if let Some(checksum_base) = split.next() { + let checksum = base64::engine::general_purpose::STANDARD + .decode(checksum_base) + .map_err(|_| { + log::error!("Can't decode checksum value"); + RustusError::WrongHeaderValue + })?; + return checksum_verify(algo, data, checksum.as_slice()); + } + } + Err(RustusError::WrongHeaderValue) + } else { + log::error!("Can't decode checksum header."); + Err(RustusError::WrongHeaderValue) + } +} + +#[cfg(test)] +mod tests { + use super::{checksum_verify, verify_chunk_checksum}; + use axum::http::HeaderValue; + + #[test] + fn test_success_checksum_verify() { + let res = checksum_verify( + "sha1", + b"hello", + b"\xaa\xf4\xc6\x1d\xdc\xc5\xe8\xa2\xda\xbe\xde\x0f;H,\xd9\xae\xa9CM", + ) + .unwrap(); + assert!(res); + let res = checksum_verify( + "sha256", + b"hello", + b",\xf2M\xba_\xb0\xa3\x0e&\xe8;*\xc5\xb9\xe2\x9e\x1b\x16\x1e\\\x1f\xa7B^s\x043b\x93\x8b\x98$", + ).unwrap(); + assert!(res); + let res = checksum_verify( + "sha512", + b"hello", + b"\x9bq\xd2$\xbdb\xf3x]\x96\xd4j\xd3\xea=s1\x9b\xfb\xc2\x89\x0c\xaa\xda\xe2\xdf\xf7%\x19g<\xa7##\xc3\xd9\x9b\xa5\xc1\x1d|z\xccn\x14\xb8\xc5\xda\x0cFcG\\.\\:\xde\xf4os\xbc\xde\xc0C", + ).unwrap(); + assert!(res); + let res = + checksum_verify("md5", b"hello", b"]A@*\xbcK*v\xb9q\x9d\x91\x10\x17\xc5\x92").unwrap(); + assert!(res); + } + + #[test] + fn test_sum_unknown_algo_checksum_verify() { + let res = checksum_verify("base64", "test".as_bytes(), b"dGVzdAo="); + assert!(res.is_err()); + } + + #[test] + fn test_success_verify_chunk_checksum() { + let res = verify_chunk_checksum( + &HeaderValue::from_str("md5 XUFAKrxLKna5cZ2REBfFkg==").unwrap(), + b"hello", + ) + .unwrap(); + assert!(res); + } + + #[test] + fn test_wrong_checksum() { + let res = verify_chunk_checksum(&HeaderValue::from_str("md5 memes==").unwrap(), b"hello"); + assert!(res.is_err()); + } + + #[test] + fn test_bytes_header() { + let res = verify_chunk_checksum( + &HeaderValue::from_bytes(b"ewq ]A@*\xbcK*v").unwrap(), + b"hello", + ); + assert!(res.is_err()); + } + + #[test] + fn test_badly_formatted_header() { + let res = verify_chunk_checksum(&HeaderValue::from_str("md5").unwrap(), b"hello"); + assert!(res.is_err()); + } +} diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 92aedeb..a202581 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -14,6 +14,7 @@ pub trait HeaderMapExt { fn check(&self, name: &str, expr: fn(&str) -> bool) -> bool; fn get_metadata(&self) -> Option>; fn get_upload_parts(&self) -> Vec; + fn get_method_override(&self) -> Option; } impl HeaderMapExt for HeaderMap { @@ -65,6 +66,12 @@ impl HeaderMapExt for HeaderMap { }) .unwrap_or_default() } + + fn get_method_override(&self) -> Option { + self.get("X-HTTP-Method-Override") + .and_then(|header| header.to_str().ok()) + .and_then(|header| header.trim().parse().ok()) + } } pub fn generate_disposition(filename: &str) -> (Header, Header) { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 0bef5e2..b153f5b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod dir_struct; pub mod enums; pub mod headers; +pub mod hashes;