Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: suggestions for code readability #2

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ humantime-serde = "1.1"
itertools = "0.13.0"
num_cpus = "1.15"
once_cell = "1.17.1"
opentelemetry-proto = "0.27.0"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand Down Expand Up @@ -106,7 +107,6 @@ prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"
tracing = "0.1.41"
opentelemetry-proto = "0.27.0"

[build-dependencies]
cargo_toml = "0.20.1"
Expand Down
85 changes: 40 additions & 45 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ use arrow_schema::Schema;
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use nom::AsBytes;
use opentelemetry_proto::tonic::logs::v1::LogsData;
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
use opentelemetry_proto::tonic::trace::v1::TracesData;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -112,23 +116,20 @@ pub async fn handle_otel_logs_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel logs
let mut json = flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}
} else {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
};
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel logs
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
let mut json = flatten_otel_logs(&logs);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}

Ok(HttpResponse::Ok().finish())
}

Expand All @@ -139,23 +140,20 @@ pub async fn handle_otel_metrics_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel metrics
let mut json = flatten_otel_metrics(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}
} else {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
};
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel metrics
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
let mut json = flatten_otel_metrics(metrics);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}

Ok(HttpResponse::Ok().finish())
}

Expand All @@ -166,23 +164,20 @@ pub async fn handle_otel_traces_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel traces
let mut json = flatten_otel_traces(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}
} else {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
};
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel traces
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
let mut json = flatten_otel_traces(&traces);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
}

Ok(HttpResponse::Ok().finish())
}

Expand Down
60 changes: 32 additions & 28 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use actix_web::HttpRequest;
use arrow_schema::Field;
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Utc};
use nom::AsBytes;
use opentelemetry_proto::tonic::{
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
};
use serde_json::Value;
use std::{
collections::{BTreeMap, HashMap},
Expand Down Expand Up @@ -47,39 +51,39 @@ pub async fn flatten_and_push_logs(
body: Bytes,
stream_name: &str,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),

//custom flattening required for otel logs
LOG_SOURCE_OTEL_LOGS => {
json = flatten_otel_logs(&body);
}

//custom flattening required for otel metrics
LOG_SOURCE_OTEL_METRICS => {
json = flatten_otel_metrics(&body);
}

//custom flattening required for otel traces
LOG_SOURCE_OTEL_TRACES => {
json = flatten_otel_traces(&body);
}
_ => {
tracing::warn!("Unknown log source: {}", log_source);
push_logs(stream_name, &req, &body).await?;
}
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
push_logs(stream_name, &req, &body).await?;
return Ok(());
};
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
match log_source.to_str().unwrap() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
//custom flattening required for otel logs
LOG_SOURCE_OTEL_LOGS => {
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
json = flatten_otel_logs(&logs);
}
//custom flattening required for otel metrics
LOG_SOURCE_OTEL_METRICS => {
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
json = flatten_otel_metrics(metrics);
}
//custom flattening required for otel traces
LOG_SOURCE_OTEL_TRACES => {
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
json = flatten_otel_traces(&traces);
}
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
log_source => {
tracing::warn!("Unknown log source: {}", log_source);
push_logs(stream_name, &req, &body).await?;
}
} else {
}

for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name, &req, &body).await?;
}

Ok(())
}

Expand Down
6 changes: 1 addition & 5 deletions src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/

use bytes::Bytes;
use opentelemetry_proto::tonic::logs::v1::LogRecord;
use opentelemetry_proto::tonic::logs::v1::LogsData;
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
Expand Down Expand Up @@ -125,11 +124,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<BTreeMap<String, Value>> {

/// this function performs the custom flattening of the otel logs
/// and returns a `Vec` of `BTreeMap` of the flattened json
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let body_str = std::str::from_utf8(body).unwrap();
let message: LogsData = serde_json::from_str(body_str).unwrap();
pub fn flatten_otel_logs(message: &LogsData) -> Vec<BTreeMap<String, Value>> {
let mut vec_otel_json = Vec::new();

for record in &message.resource_logs {
let mut resource_log_json = BTreeMap::new();

Expand Down
5 changes: 1 addition & 4 deletions src/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use std::collections::BTreeMap;

use bytes::Bytes;
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
use opentelemetry_proto::tonic::metrics::v1::{
exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar,
Expand Down Expand Up @@ -386,9 +385,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec<BTreeMap<String, V

/// this function performs the custom flattening of the otel metrics
/// and returns a `Vec` of `BTreeMap` of the flattened json
pub fn flatten_otel_metrics(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let body_str = std::str::from_utf8(body).unwrap();
let message: MetricsData = serde_json::from_str(body_str).unwrap();
pub fn flatten_otel_metrics(message: MetricsData) -> Vec<BTreeMap<String, Value>> {
let mut vec_otel_json = Vec::new();
for record in &message.resource_metrics {
let mut resource_metrics_json = BTreeMap::new();
Expand Down
6 changes: 1 addition & 5 deletions src/otel/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*
*/

use bytes::Bytes;

use opentelemetry_proto::tonic::trace::v1::span::Event;
use opentelemetry_proto::tonic::trace::v1::span::Link;
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
Expand Down Expand Up @@ -71,9 +69,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec<BTreeMap<String, Value>> {

/// this function performs the custom flattening of the otel traces event
/// and returns a `Vec` of `BTreeMap` of the flattened json
pub fn flatten_otel_traces(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let body_str = std::str::from_utf8(body).unwrap();
let message: TracesData = serde_json::from_str(body_str).unwrap();
pub fn flatten_otel_traces(message: &TracesData) -> Vec<BTreeMap<String, Value>> {
let mut vec_otel_json = Vec::new();

for record in &message.resource_spans {
Expand Down
Loading