Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added sentry integration. #147

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,083 changes: 712 additions & 371 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 21 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ path = "src/main.rs"

[dependencies]
async-trait = "0.1.74"
axum = { git = "https://github.com/tokio-rs/axum.git", rev = "3ff45d9" }
axum = { version = "0.7.1" }
base64 = "0.21.5"
bytes = "1.5.0"
chrono = { version = "0.4.26", features = ["serde"] }
Expand All @@ -25,17 +25,16 @@ enum_dispatch = "0.3.12"
fern = { version = "0.6.2", features = ["colored", "chrono"] }
futures = "0.3.29"
log = "0.4.20"
mimalloc = "0.1.39"
mime = "0.3.17"
mime_guess = "2.0.4"
mobc = "0.8.3"
redis = { version = "0.23.3", features = ["tokio-comp", "connection-manager"] }
redis = { version = "0.24.0", features = ["tokio-comp", "connection-manager"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.192", features = ["derive"] }
serde_json = "1.0.108"
strum = { version = "0.25.0", features = ["derive"] }
thiserror = "1.0.50"
tokio = { version = "1.31.0", features = ["full"] }
tokio = { version = "1.31.0", features = ["full", "bytes"] }
tokio-util = { version = "0.7.10", features = ["io"] }
uuid = { version = "1.5.0", features = ["v4"] }
rust-s3 = "^0.33"
Expand All @@ -47,10 +46,26 @@ md-5 = "^0.10.1"
digest = "^0.10.1"
reqwest = "0.11.22"
lapin = "2.3.1"
tower-http = { version = "0.4.4", features = ["cors"] }
http = "^0.2"
tower-http = { version = "0.5.0", features = ["cors", "trace"] }
wildmatch = "2.1.1"
tracing = "0.1.40"
sentry = "0.32.0"
sentry-tracing = "0.32.0"
sentry-tower = { version = "0.32.0", features = [
"http",
"axum",
"axum-matched-path",
] }
tracing-subscriber = { version = "0.3.18", features = [
"smallvec",
"parking_lot",
"time",
] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5.4", features = [
"background_threads_runtime_support",
] }

[profile.release]
opt-level = 3
Expand Down
7 changes: 6 additions & 1 deletion deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
COPY imgs ./imgs

ENV JEMALLOC_SYS_WITH_MALLOC_CONF="background_thread:true,metadata_thp:always,tcache:false,dirty_decay_ms:0,muzzy_decay_ms:0,abort_conf:true"
RUN cargo build --release --bin rustus

FROM debian:bullseye-20211201-slim AS base
FROM debian:bullseye-20231120-slim AS base

COPY --from=builder /app/target/release/rustus /usr/local/bin/
RUN apt update && apt install -y libssl-dev ca-certificates libjemalloc-dev && apt clean

VOLUME [ "/data" ]

ENTRYPOINT ["/usr/local/bin/rustus"]

Expand Down
4 changes: 3 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ please provide sentry-dsn to rustus.

``` bash
rustus --sentry-dsn "https://[email protected]/11" \
--sentry-sample-rate 1.0
--sentry-sample-rate 1.0 \
--sentry-environment "dev"
```

=== "ENV"

``` bash
export RUSTUS_SENTRY_DSN="https://[email protected]/11"
export RUSTUS_SENTRY_SAMPLE_RATE="1.0"
export RUSTUS_SENTRY_ENVIRONMENT="dev"

rustus
```
Expand Down
66 changes: 65 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,58 @@ pub struct NotificationConfig {
pub amqp_hook_opts: AMQPHooksOptions,
}

#[derive(Parser, Clone, Debug)]
pub struct SentryConfig {
/// Sentry DSN.
///
/// Link to sentry project, which is used to send events to.
#[arg(name = "sentry-dsn", long, env = "RUSTUS_SENTRY_DSN")]
pub dsn: Option<String>,

/// Sentry sample rate.
///
/// This option is used to set how often events are sent to sentry.
/// The default value is 1.0, which means that all events are sent.
#[arg(
name = "sentry-sample-rate",
long,
default_value = "1.0",
env = "RUSTUS_SENTRY_SAMPLE_RATE"
)]
pub sample_rate: Option<f32>,

/// Sentry traces sample rate.
///
/// This option is used to set how often traces are sent to sentry.
/// Traces are used to track performance, so this option might not be
/// useful for regular users.
#[arg(
name = "sentry-traces-sample-rate",
long,
env = "RUSTUS_SENTRY_TRACES_SAMPLE_RATE"
)]
pub traces_sample_rate: Option<f32>,

/// Sentry environment.
///
/// This option is used to set environment for sentry.
#[arg(name = "sentry-environment", long, env = "RUSTUS_SENTRY_ENVIRONMENT")]
pub environment: Option<String>,

/// DEvelopment option for sentry.
///
/// This option enables logging of sentry events,
/// which is useful for debugging. But it is not recommended
/// to enable this option in production.
#[arg(
name = "sentry-debug",
long,
default_value = "false",
env = "RUSTUS_SENTRY_DEBUG"
)]
pub debug: bool,
}

#[derive(Parser, Clone, Debug)]
#[command(author, version, about, long_about = None)]
#[allow(clippy::struct_excessive_bools)]
Expand All @@ -290,7 +342,7 @@ pub struct Config {

/// Log level for the server.
#[arg(long, default_value = "INFO", env = "RUSTUS_LOG_LEVEL")]
pub log_level: log::LevelFilter,
pub log_level: tracing::Level,

/// Number of worker threads for the server.
///
Expand All @@ -302,6 +354,15 @@ pub struct Config {
#[arg(long, default_value = "/files", env = "RUSTUS_PREFIX")]
pub url: String,

/// Option to disable access logging completely.
/// Useful when using sentry, to not spam with logs,
/// because sentry might incorrectly capture some access logs,
/// which is annoying.
///
/// By default it is disabled.
#[arg(long, default_value = "false", env = "RUSTUS_NO_ACCESS")]
pub no_access: bool,

/// Disable access log for health endpoint.
/// By default it is enabled.
#[arg(long, env = "RUSTUS_DISABLE_HEALTH_ACCESS_LOG")]
Expand Down Expand Up @@ -374,6 +435,9 @@ pub struct Config {

#[command(flatten)]
pub notification_config: NotificationConfig,

#[command(flatten)]
pub sentry_config: SentryConfig,
}

impl Config {
Expand Down
139 changes: 64 additions & 75 deletions src/data_storage/impls/file_storage.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{io::Write, path::PathBuf};
use std::path::PathBuf;

use tokio::io::AsyncWriteExt;

use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use log::error;
use std::{
fs::{remove_file, DirBuilder, OpenOptions},
io::{copy, BufReader, BufWriter},
};
use std::fs::DirBuilder;

use crate::{
data_storage::base::Storage,
Expand All @@ -15,7 +13,7 @@ use crate::{
utils::{dir_struct::substr_now, headers::HeaderMapExt},
};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FileStorage {
data_dir: PathBuf,
dir_struct: String,
Expand Down Expand Up @@ -82,45 +80,42 @@ impl Storage for FileStorage {
Ok(resp)
}

async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> {
async fn add_bytes(&self, file_info: &FileInfo, mut bytes: Bytes) -> RustusResult<()> {
// In normal situation this `if` statement is not
// gonna be called, but what if it is ...
if file_info.path.is_none() {
let Some(path) = &file_info.path else {
return Err(RustusError::FileNotFound);
};
let file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(false)
.read(false)
.truncate(false)
.open(path.as_str())
.await?;
let mut writer = tokio::io::BufWriter::new(file);
writer.write_all(&bytes).await?;
writer.flush().await?;
if self.force_fsync {
writer.get_ref().sync_data().await?;
}
let path = file_info.path.as_ref().unwrap().clone();
let force_sync = self.force_fsync;
tokio::task::spawn_blocking(move || {
// Opening file in w+a mode.
// It means that we're going to append some
// bytes to the end of a file.
let file = OpenOptions::new()
.write(true)
.append(true)
.create(false)
.read(false)
.truncate(false)
.open(path.as_str())?;
let mut writer = BufWriter::new(file);
writer.write_all(bytes.as_ref())?;
writer.flush()?;
if force_sync {
writer.get_ref().sync_data()?;
}
Ok(())
})
.await?
writer.into_inner().shutdown().await?;
bytes.clear();
Ok(())
}

async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
// New path to file.
let file_path = self.data_file_path(file_info.id.as_str())?;
OpenOptions::new()
let mut opened = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.create_new(true)
.open(file_path.as_path())?;
.open(file_path.as_path())
.await?;
opened.shutdown().await?;
Ok(file_path.display().to_string())
}

Expand All @@ -129,54 +124,48 @@ impl Storage for FileStorage {
file_info: &FileInfo,
parts_info: Vec<FileInfo>,
) -> RustusResult<()> {
let force_fsync = self.force_fsync;
if file_info.path.is_none() {
let Some(path) = &file_info.path else {
return Err(RustusError::FileNotFound);
}
let path = file_info.path.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || {
let file = OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(path)?;
let mut writer = BufWriter::new(file);
for part in parts_info {
if part.path.is_none() {
return Err(RustusError::FileNotFound);
}
let part_file = OpenOptions::new()
.read(true)
.open(part.path.as_ref().unwrap())?;
let mut reader = BufReader::new(part_file);
copy(&mut reader, &mut writer)?;
}
writer.flush()?;
if force_fsync {
writer.get_ref().sync_data()?;
};
let file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(path)
.await?;
let mut writer = tokio::io::BufWriter::new(file);
for part in parts_info {
if part.path.is_none() {
return Err(RustusError::FileNotFound);
}
Ok(())
})
.await?
let part_file = tokio::fs::OpenOptions::new()
.read(true)
.open(part.path.as_ref().unwrap())
.await?;
let mut reader = tokio::io::BufReader::new(part_file);
tokio::io::copy_buf(&mut reader, &mut writer).await?;
reader.shutdown().await?;
}
writer.flush().await?;
if self.force_fsync {
writer.get_ref().sync_data().await?;
}
writer.into_inner().shutdown().await?;
Ok(())
}

async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
let info = file_info.clone();
if info.path.is_none() {
let Some(path) = &file_info.path else {
return Err(RustusError::FileNotFound);
};
let data_path = PathBuf::from(path);
if !data_path.exists() {
return Err(RustusError::FileNotFound);
}
tokio::task::spawn_blocking(move || {
// Let's remove the file itself.
let data_path = PathBuf::from(info.path.as_ref().unwrap().clone());
if !data_path.exists() {
return Err(RustusError::FileNotFound);
}
remove_file(data_path).map_err(|err| {
error!("{:?}", err);
RustusError::UnableToRemove(info.id.clone())
})?;
Ok(())
})
.await?
tokio::fs::remove_file(data_path).await.map_err(|err| {
tracing::error!("{:?}", err);
RustusError::UnableToRemove(String::from(path.as_str()))
})?;
Ok(())
}
}
Loading
Loading