Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add log source to stream info #1092

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use anyhow::{anyhow, Error as AnyError};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::{metadata::SchemaVersion, utils::arrow::get_field};
Expand All @@ -38,7 +39,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
type EventSchema = Vec<Arc<Field>>;

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq)]
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum LogSource {
// AWS Kinesis sends logs in the format of a json array
Kinesis,
Expand All @@ -51,6 +52,8 @@ pub enum LogSource {
// OpenTelemetry sends traces according to the specification as explained here
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
OtelTraces,
// Internal Stream format
Pmeta,
#[default]
// Json object or array
Json,
Expand All @@ -64,7 +67,8 @@ impl From<&str> for LogSource {
"otel-logs" => LogSource::OtelLogs,
"otel-metrics" => LogSource::OtelMetrics,
"otel-traces" => LogSource::OtelTraces,
custom => LogSource::Custom(custom.to_owned()),
"pmeta" => LogSource::Pmeta,
_ => LogSource::Json,
}
}
}
Expand Down
30 changes: 26 additions & 4 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
stream_name
)));
}
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(
&stream_name,
&StreamType::UserDefined.to_string(),
LogSource::default(),
)
.await?;

flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -130,7 +135,12 @@ pub async fn handle_otel_logs_ingestion(
}

let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(
&stream_name,
&StreamType::UserDefined.to_string(),
LogSource::OtelLogs,
)
.await?;

//custom flattening required for otel logs
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
Expand Down Expand Up @@ -163,7 +173,12 @@ pub async fn handle_otel_metrics_ingestion(
)));
}
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(
&stream_name,
&StreamType::UserDefined.to_string(),
LogSource::OtelMetrics,
)
.await?;

//custom flattening required for otel metrics
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
Expand Down Expand Up @@ -197,7 +212,12 @@ pub async fn handle_otel_traces_ingestion(
)));
}
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(
&stream_name,
&StreamType::UserDefined.to_string(),
LogSource::OtelTraces,
)
.await?;

//custom flattening required for otel traces
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
Expand Down Expand Up @@ -264,6 +284,7 @@ pub async fn push_logs_unchecked(
pub async fn create_stream_if_not_exists(
stream_name: &str,
stream_type: &str,
log_source: LogSource,
) -> Result<bool, PostError> {
let mut stream_exists = false;
if STREAM_INFO.stream_exists(stream_name) {
Expand All @@ -288,6 +309,7 @@ pub async fn create_stream_if_not_exists(
false,
Arc::new(Schema::empty()),
stream_type,
log_source,
)
.await?;

Expand Down
58 changes: 52 additions & 6 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::modal::utils::logstream_utils::{
use super::query::update_schema_when_distributed;
use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::event::format::override_data_type;
use crate::event::format::{override_data_type, LogSource};
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::{SchemaVersion, STREAM_INFO};
Expand All @@ -35,8 +35,8 @@ use crate::option::{Mode, CONFIG};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::StreamType;
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
use crate::storage::{retention::Retention, StorageDir};
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::{event, stats};

Expand Down Expand Up @@ -484,6 +484,7 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -492,6 +493,7 @@ pub async fn create_stream(
static_schema_flag: bool,
schema: Arc<Schema>,
stream_type: &str,
log_source: LogSource,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -509,6 +511,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
log_source.clone(),
)
.await
{
Expand All @@ -533,6 +536,7 @@ pub async fn create_stream(
static_schema,
stream_type,
SchemaVersion::V1, // New stream
log_source,
);
}
Err(err) => {
Expand Down Expand Up @@ -583,6 +587,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
static_schema_flag: stream_meta.static_schema_flag,
log_source: stream_meta.log_source.clone(),
};

// get the other info from
Expand Down Expand Up @@ -725,8 +730,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
if let Ok(stream_exists) = create_stream_if_not_exists(
INTERNAL_STREAM_NAME,
&StreamType::Internal.to_string(),
LogSource::Pmeta,
)
.await
{
if stream_exists {
return Ok(());
Expand Down Expand Up @@ -894,9 +903,9 @@ pub mod error {
mod tests {
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::logstream::get_stats;
use crate::handlers::http::modal::utils::logstream_utils::fetch_headers_from_put_stream_request;
use actix_web::test::TestRequest;
use anyhow::bail;

#[actix_web::test]
#[should_panic]
async fn get_stats_panics_without_logstream() {
Expand All @@ -915,4 +924,41 @@ mod tests {
_ => bail!("expected StreamNotFound error"),
}
}

#[actix_web::test]
async fn header_without_log_source() {
let req = TestRequest::default().to_http_request();
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
assert_eq!(log_source, crate::event::format::LogSource::Json);
}

#[actix_web::test]
async fn header_with_known_log_source() {
let mut req = TestRequest::default()
.insert_header(("X-P-Log-Source", "pmeta"))
.to_http_request();
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
assert_eq!(log_source, crate::event::format::LogSource::Pmeta);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "otel-logs"))
.to_http_request();
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "kinesis"))
.to_http_request();
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
}

#[actix_web::test]
async fn header_with_unknown_log_source() {
let req = TestRequest::default()
.insert_header(("X-P-Log-Source", "teststream"))
.to_http_request();
let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req);
assert_eq!(log_source, crate::event::format::LogSource::Json);
}
}
21 changes: 17 additions & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use bytes::Bytes;
use http::StatusCode;

use crate::{
event::format::LogSource,
handlers::{
http::logstream::error::{CreateStreamError, StreamError},
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
},
metadata::{self, SchemaVersion, STREAM_INFO},
option::{Mode, CONFIG},
Expand All @@ -48,6 +49,7 @@ pub async fn create_update_stream(
static_schema_flag,
update_stream_flag,
stream_type,
log_source,
) = fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag {
Expand Down Expand Up @@ -113,6 +115,7 @@ pub async fn create_update_stream(
static_schema_flag,
schema,
&stream_type,
log_source,
)
.await?;

Expand Down Expand Up @@ -167,13 +170,14 @@ async fn validate_and_update_custom_partition(

pub fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, bool, bool, String) {
) -> (String, String, String, bool, bool, String, LogSource) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = false;
let mut update_stream_flag = false;
let mut stream_type = StreamType::UserDefined.to_string();
let mut log_source = LogSource::default();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
time_partition = value.to_str().unwrap().to_string();
Expand All @@ -193,6 +197,9 @@ pub fn fetch_headers_from_put_stream_request(
if key == STREAM_TYPE_KEY {
stream_type = value.to_str().unwrap().to_string();
}
if key == LOG_SOURCE_KEY {
log_source = LogSource::from(value.to_str().unwrap());
}
});

(
Expand All @@ -202,6 +209,7 @@ pub fn fetch_headers_from_put_stream_request(
static_schema_flag,
update_stream_flag,
stream_type,
log_source,
)
}

Expand Down Expand Up @@ -378,6 +386,7 @@ pub async fn update_custom_partition_in_stream(
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -386,6 +395,7 @@ pub async fn create_stream(
static_schema_flag: bool,
schema: Arc<Schema>,
stream_type: &str,
log_source: LogSource,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -403,6 +413,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
log_source.clone(),
)
.await
{
Expand All @@ -427,6 +438,7 @@ pub async fn create_stream(
static_schema,
stream_type,
SchemaVersion::V1, // New stream
log_source,
);
}
Err(err) => {
Expand Down Expand Up @@ -476,7 +488,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
let static_schema_flag = stream_metadata.static_schema_flag;
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
let schema_version = stream_metadata.schema_version;

let log_source = stream_metadata.log_source;
metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
stream_metadata.created_at,
Expand All @@ -487,6 +499,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
static_schema,
stream_type,
schema_version,
log_source,
);
} else {
return Ok(false);
Expand Down
8 changes: 7 additions & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::{collections::HashMap, fmt::Debug};
use tracing::{debug, error, info, warn};

use crate::audit::AuditLogBuilder;
use crate::event::format::LogSource;
use crate::option::CONFIG;
use crate::{
event::{
Expand Down Expand Up @@ -180,7 +181,12 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> {
let stream_name = msg.topic();

// stream should get created only if there is an incoming event, not before that
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(
stream_name,
&StreamType::UserDefined.to_string(),
LogSource::default(),
)
.await?;

let schema = resolve_schema(stream_name)?;
let event = format::json::Event {
Expand Down
Loading
Loading