diff --git a/changelog.d/21313-new-relic-logs-attributes.fix.md b/changelog.d/21313-new-relic-logs-attributes.fix.md new file mode 100644 index 0000000000000..ef90184ca715a --- /dev/null +++ b/changelog.d/21313-new-relic-logs-attributes.fix.md @@ -0,0 +1,4 @@ +Previously, when the `new_relic` sink sent non-standard event fields to the logs +API, it would include those fields beside the standard event fields (i.e. +`message` and `timestamp`). Now, any such fields are sent in an `attributes` +object, as specified by the New Relic logs API documentation. diff --git a/src/sinks/new_relic/mod.rs b/src/sinks/new_relic/mod.rs index eaf956ac45e64..b8a3a7d0169e7 100644 --- a/src/sinks/new_relic/mod.rs +++ b/src/sinks/new_relic/mod.rs @@ -5,13 +5,13 @@ mod model; mod service; mod sink; -pub use config::*; -pub use encoding::*; -pub use model::*; -pub use service::*; -pub use sink::*; +use config::*; +use encoding::*; +use model::*; +use service::*; +use sink::*; -pub use super::{Healthcheck, VectorSink}; +use super::{Healthcheck, VectorSink}; #[cfg(test)] -pub mod tests; +mod tests; diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 376ecdfff0ea5..7874ada5da9ba 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -7,31 +7,35 @@ use std::{ use chrono::{DateTime, Utc}; use ordered_float::NotNan; -use serde::{Deserialize, Serialize}; -use vector_lib::event::ObjectMap; +use serde::Serialize; use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; +use vector_lib::{config::log_schema, event::ObjectMap}; use vrl::event_path; use super::NewRelicSinkError; -use crate::event::{Event, KeyString, MetricKind, MetricValue, Value}; +use crate::event::{Event, MetricKind, MetricValue, Value}; #[derive(Debug)] -pub enum NewRelicApiModel { +pub(super) enum NewRelicApiModel { Metrics(MetricsApiModel), Events(EventsApiModel), Logs(LogsApiModel), } -type DataStore = HashMap>; +/// The metrics API data model. +/// +/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/metric-api/report-metrics-metric-api/ +#[derive(Debug, Serialize)] +pub(super) struct MetricsApiModel(pub [MetricDataStore; 1]); -#[derive(Serialize, Deserialize, Debug)] -pub struct MetricsApiModel(pub Vec); +#[derive(Debug, Serialize)] +pub(super) struct MetricDataStore { + pub metrics: Vec, +} impl MetricsApiModel { - pub fn new(metric_array: Vec) -> Self { - let mut metric_store = DataStore::new(); - metric_store.insert("metrics".into(), metric_array); - Self(vec![metric_store]) + pub(super) fn new(metrics: Vec) -> Self { + Self([MetricDataStore { metrics }]) } } @@ -144,11 +148,14 @@ impl TryFrom> for MetricsApiModel { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct EventsApiModel(pub Vec); +/// The events API data mode. +/// +/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/event-api/introduction-event-api/ +#[derive(Debug, Serialize)] +pub(super) struct EventsApiModel(pub Vec); impl EventsApiModel { - pub fn new(events_array: Vec) -> Self { + pub(super) fn new(events_array: Vec) -> Self { Self(events_array) } } @@ -238,14 +245,35 @@ impl TryFrom> for EventsApiModel { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct LogsApiModel(pub Vec); +/// The logs API data model. +/// +/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/ +#[derive(Serialize, Debug)] +pub(super) struct LogsApiModel(pub [LogDataStore; 1]); + +#[derive(Serialize, Debug)] +pub(super) struct LogDataStore { + pub logs: Vec, +} + +#[derive(Debug, PartialEq, Serialize)] +pub(super) struct LogMessage { + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, + pub attributes: ObjectMap, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(untagged)] +pub(super) enum Timestamp { + Numeric(i64), + String(String), +} impl LogsApiModel { - pub fn new(logs_array: Vec) -> Self { - let mut logs_store = DataStore::new(); - logs_store.insert("logs".into(), logs_array); - Self(vec![logs_store]) + pub(super) fn new(logs: Vec) -> Self { + Self([LogDataStore { logs }]) } } @@ -255,15 +283,20 @@ impl TryFrom> for LogsApiModel { fn try_from(buf_events: Vec) -> Result { let mut num_non_log_events = 0; let mut num_non_object_events = 0; + let message_key = log_schema().message_key_target_path().unwrap(); + let timestamp_key = log_schema().timestamp_key_target_path().unwrap(); - let logs_array: Vec = buf_events + let logs_array: Vec = buf_events .into_iter() .filter_map(|event| { - let Some(log) = event.try_into_log() else { + let Some(mut log) = event.try_into_log() else { num_non_log_events += 1; return None; }; + let message = get_message_string(log.remove(message_key)); + let timestamp = log.remove(timestamp_key).and_then(map_timestamp_value); + // We convert the log event into a logs API model simply by transmuting the type // wrapper and dropping all arrays, which are not supported by the API. We could // flatten out the keys, as this is what New Relic does internally, and we used to @@ -272,16 +305,17 @@ impl TryFrom> for LogsApiModel { // broken attributes in New Relic, and nesting objects is actually a (slightly) more // efficient representation of the key names. let (value, _metadata) = log.into_parts(); - let Some(mut obj) = value.into_object() else { + let Some(mut attributes) = value.into_object() else { num_non_object_events += 1; return None; }; - strip_arrays(&mut obj); - if !obj.contains_key("message") { - obj.insert("message".into(), Value::from("log from vector".to_owned())); - } + strip_arrays(&mut attributes); - Some(obj) + Some(LogMessage { + message, + timestamp, + attributes, + }) }) .collect(); @@ -306,6 +340,32 @@ impl TryFrom> for LogsApiModel { } } +const MILLISECONDS: f64 = 1000.0; + +/// Convert a value into a timestamp value. New Relic accepts either milliseconds or seconds since +/// epoch as an integer, or ISO8601-formatted timestamp as a string. +/// +/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/#json-logs +fn map_timestamp_value(value: Value) -> Option { + match value { + Value::Timestamp(t) => Some(Timestamp::Numeric(t.timestamp_millis())), + Value::Integer(n) => Some(Timestamp::Numeric(n)), + Value::Float(f) => Some(Timestamp::Numeric((f.into_inner() * MILLISECONDS) as i64)), + Value::Bytes(b) => Some(Timestamp::String( + String::from_utf8_lossy(b.as_ref()).into(), + )), + _ => None, + } +} + +fn get_message_string(value: Option) -> String { + match value { + Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes.as_ref()).into(), + Some(value) => value.to_string(), + None => "log from vector".to_string(), + } +} + fn strip_arrays(obj: &mut ObjectMap) { obj.retain(|_key, value| !value.is_array()); obj.iter_mut().for_each(|(_key, value)| { diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 66be6888ec984..0c82141776b4c 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -3,8 +3,9 @@ use std::{convert::TryFrom, num::NonZeroU32, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; use serde::Deserialize; +use serde_json::{json, to_value}; use vector_lib::config::{init_telemetry, Tags, Telemetry}; -use vrl::{btreemap, value}; +use vrl::value; use super::*; use crate::{ @@ -80,12 +81,12 @@ fn generates_event_api_model_without_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" => "TestEvent", - "user" => "Joe", - "user_id" => 123456, - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + }]) ); } @@ -101,13 +102,13 @@ fn generates_event_api_model_with_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" =>"TestEvent", - "user" =>"Joe", - "user_id" =>123456, - "message" =>"This is a message", - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + "message": "This is a message", + }]) ); } @@ -123,13 +124,13 @@ fn generates_event_api_model_with_json_inside_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" =>"TestEvent", - "user" =>"Joe", - "user_id" =>123456, - "my_key" =>"my_value", - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + "my_key": "my_value", + }]) ); } @@ -137,14 +138,15 @@ fn generates_event_api_model_with_json_inside_message_field() { fn generates_log_api_model_without_message_field() { let event = Event::Log(LogEvent::from(value!({"tag_key": "tag_value"}))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "tag_key" =>"tag_value", - "message" =>"log from vector", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "log from vector", + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -155,14 +157,15 @@ fn generates_log_api_model_with_message_field() { "message": "This is a message", }))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "tag_key" =>"tag_value", - "message" =>"This is a message", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "This is a message", + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -174,15 +177,40 @@ fn generates_log_api_model_with_dotted_fields() { "three": sub, }))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "one.two" =>1, - "three" =>btreemap! {"four" =>2,}, - "message" =>"log from vector", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "log from vector", + "attributes": { + "one.two": 1, + "three": {"four": 2}, + }, + }] + }]) + ); +} + +#[test] +fn generates_log_api_model_with_timestamp() { + let stamp = DateTime::::from(SystemTime::now()); + let event = Event::Log(LogEvent::from(value!({ + "timestamp": stamp, + "tag_key": "tag_value", + "message": "This is a message", + }))); + let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); + + assert_eq!( + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "This is a message", + "timestamp": stamp.timestamp_millis(), + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -195,18 +223,18 @@ fn generates_metric_api_model_without_timestamp() { )); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); + let metrics = &model.0[0].metrics; assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "timestamp" =>metrics[0].get("timestamp").unwrap().clone(), - "type" =>"gauge", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "timestamp": metrics[0].get("timestamp").unwrap().clone(), + "type": "gauge", + }] + }]) ); } @@ -222,18 +250,17 @@ fn generates_metric_api_model_with_timestamp() { let event = Event::Metric(m); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "timestamp" =>stamp.timestamp(), - "type" =>"gauge", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "timestamp": stamp.timestamp(), + "type": "gauge", + }] + }]) ); } @@ -250,18 +277,17 @@ fn generates_metric_api_model_incremental_counter() { let event = Event::Metric(m); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "interval.ms" =>1000, - "timestamp" =>stamp.timestamp(), - "type" =>"count", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "interval.ms": 1000, + "timestamp": stamp.timestamp(), + "type": "count", + }] + }]) ); }