Skip to content

Commit

Permalink
refactor: LogSource enum
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 4, 2025
1 parent 74ddc86 commit a32e4b3
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use super::{EventFormat, LogSource, Metadata, Tags};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
Expand All @@ -52,7 +52,7 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(
self.data,
Expand Down
36 changes: 34 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,38 @@ type Tags = String;
type Metadata = String;
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
Kinesis,
// OpenTelemetry sends logs according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
OtelLogs,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
OtelMetrics,
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
OtelTraces,
#[default]
// Json object or array
Json,
Custom(String),
}

impl From<&str> for LogSource {
fn from(s: &str) -> Self {
match s {
"kinesis" => LogSource::Kinesis,
"otel-logs" => LogSource::OtelLogs,
"otel-metrics" => LogSource::OtelMetrics,
"otel-traces" => LogSource::OtelTraces,
custom => LogSource::Custom(custom.to_owned()),
}
}
}

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
Expand All @@ -54,7 +86,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -65,7 +97,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema,
Expand Down
100 changes: 70 additions & 30 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use crate::event::format::LogSource;
use crate::event::{
self,
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES,
STREAM_NAME_HEADER_KEY,
};
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::{SchemaVersion, STREAM_INFO};
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -95,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
metadata: String::default(),
};
// For internal streams, use old schema
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")?
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, &LogSource::default())?
};
event::Event {
rb,
Expand Down Expand Up @@ -127,8 +125,8 @@ pub async fn handle_otel_logs_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_LOGS {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelLogs {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-logs for ingesting otel logs"
)));
Expand All @@ -142,7 +140,7 @@ pub async fn handle_otel_logs_ingestion(
let mut json = flatten_otel_logs(&logs);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -161,8 +159,8 @@ pub async fn handle_otel_metrics_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_METRICS {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelMetrics {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
)));
Expand All @@ -175,7 +173,7 @@ pub async fn handle_otel_metrics_ingestion(
let mut json = flatten_otel_metrics(metrics);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -195,8 +193,8 @@ pub async fn handle_otel_traces_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_TRACES {
let log_source = LogSource::from(log_source.to_str().unwrap());
if log_source != LogSource::OtelTraces {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-traces for ingesting otel traces"
)));
Expand All @@ -209,7 +207,7 @@ pub async fn handle_otel_traces_ingestion(
let mut json = flatten_otel_traces(&traces);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body, log_source).await?;
push_logs(&stream_name, &req, &body, &log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -371,7 +369,7 @@ mod tests {
use serde_json::json;

use crate::{
event,
event::{self, format::LogSource},
handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS},
metadata::SchemaVersion,
};
Expand Down Expand Up @@ -420,7 +418,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -471,7 +469,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -505,8 +503,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -538,7 +544,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
assert!(into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
}

#[test]
Expand All @@ -556,8 +571,16 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -576,7 +599,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
""
&LogSource::default()
)
.is_err())
}
Expand Down Expand Up @@ -608,7 +631,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -665,7 +688,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -715,8 +738,16 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
let (rb, _) = into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -765,7 +796,16 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
assert!(into_event_batch(
&req,
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
}

#[test]
Expand Down Expand Up @@ -800,7 +840,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
&LogSource::default(),
)
.unwrap();

Expand Down Expand Up @@ -881,7 +921,7 @@ mod tests {
None,
None,
SchemaVersion::V1,
"",
&LogSource::default(),
)
.unwrap();

Expand Down
Loading

0 comments on commit a32e4b3

Please sign in to comment.