From 162d9b5267d8a3f82deb6015fcee2a7c28b5a08b Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Wed, 18 Sep 2024 15:46:56 -0600 Subject: [PATCH] fix(new_relic sink): Put log API attributes in separate structure (#21313) * fix(new_relic sink): Put log API attributes in separate structure The New Relic log API specifies that all attributes other than `message` and `timestamp` should go into an `attributes` field, which may contain any JSON object data except for arrays. This change reworks the logs API model to move all attributes from the log event into this attributes field as well as using structures to encode fields with fixed names instead of using dynamic maps. The tests have also been updated to compare the overall structure of the output data model so as to ensure that the resulting encoding generated by `serde` matches what we is specified in the New Relic API documentation. * Add change log entry --- .../21313-new-relic-logs-attributes.fix.md | 4 + src/sinks/new_relic/mod.rs | 14 +- src/sinks/new_relic/model.rs | 116 +++++++++--- src/sinks/new_relic/tests.rs | 168 ++++++++++-------- 4 files changed, 196 insertions(+), 106 deletions(-) create mode 100644 changelog.d/21313-new-relic-logs-attributes.fix.md diff --git a/changelog.d/21313-new-relic-logs-attributes.fix.md b/changelog.d/21313-new-relic-logs-attributes.fix.md new file mode 100644 index 0000000000000..ef90184ca715a --- /dev/null +++ b/changelog.d/21313-new-relic-logs-attributes.fix.md @@ -0,0 +1,4 @@ +Previously, when the `new_relic` sink sent non-standard event fields to the logs +API, it would include those fields beside the standard event fields (i.e. +`message` and `timestamp`). Now, any such fields are sent in an `attributes` +object, as specified by the New Relic logs API documentation. diff --git a/src/sinks/new_relic/mod.rs b/src/sinks/new_relic/mod.rs index eaf956ac45e64..b8a3a7d0169e7 100644 --- a/src/sinks/new_relic/mod.rs +++ b/src/sinks/new_relic/mod.rs @@ -5,13 +5,13 @@ mod model; mod service; mod sink; -pub use config::*; -pub use encoding::*; -pub use model::*; -pub use service::*; -pub use sink::*; +use config::*; +use encoding::*; +use model::*; +use service::*; +use sink::*; -pub use super::{Healthcheck, VectorSink}; +use super::{Healthcheck, VectorSink}; #[cfg(test)] -pub mod tests; +mod tests; diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 376ecdfff0ea5..7874ada5da9ba 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -7,31 +7,35 @@ use std::{ use chrono::{DateTime, Utc}; use ordered_float::NotNan; -use serde::{Deserialize, Serialize}; -use vector_lib::event::ObjectMap; +use serde::Serialize; use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; +use vector_lib::{config::log_schema, event::ObjectMap}; use vrl::event_path; use super::NewRelicSinkError; -use crate::event::{Event, KeyString, MetricKind, MetricValue, Value}; +use crate::event::{Event, MetricKind, MetricValue, Value}; #[derive(Debug)] -pub enum NewRelicApiModel { +pub(super) enum NewRelicApiModel { Metrics(MetricsApiModel), Events(EventsApiModel), Logs(LogsApiModel), } -type DataStore = HashMap>; +/// The metrics API data model. +/// +/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/metric-api/report-metrics-metric-api/ +#[derive(Debug, Serialize)] +pub(super) struct MetricsApiModel(pub [MetricDataStore; 1]); -#[derive(Serialize, Deserialize, Debug)] -pub struct MetricsApiModel(pub Vec); +#[derive(Debug, Serialize)] +pub(super) struct MetricDataStore { + pub metrics: Vec, +} impl MetricsApiModel { - pub fn new(metric_array: Vec) -> Self { - let mut metric_store = DataStore::new(); - metric_store.insert("metrics".into(), metric_array); - Self(vec![metric_store]) + pub(super) fn new(metrics: Vec) -> Self { + Self([MetricDataStore { metrics }]) } } @@ -144,11 +148,14 @@ impl TryFrom> for MetricsApiModel { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct EventsApiModel(pub Vec); +/// The events API data mode. +/// +/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/event-api/introduction-event-api/ +#[derive(Debug, Serialize)] +pub(super) struct EventsApiModel(pub Vec); impl EventsApiModel { - pub fn new(events_array: Vec) -> Self { + pub(super) fn new(events_array: Vec) -> Self { Self(events_array) } } @@ -238,14 +245,35 @@ impl TryFrom> for EventsApiModel { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct LogsApiModel(pub Vec); +/// The logs API data model. +/// +/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/ +#[derive(Serialize, Debug)] +pub(super) struct LogsApiModel(pub [LogDataStore; 1]); + +#[derive(Serialize, Debug)] +pub(super) struct LogDataStore { + pub logs: Vec, +} + +#[derive(Debug, PartialEq, Serialize)] +pub(super) struct LogMessage { + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, + pub attributes: ObjectMap, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(untagged)] +pub(super) enum Timestamp { + Numeric(i64), + String(String), +} impl LogsApiModel { - pub fn new(logs_array: Vec) -> Self { - let mut logs_store = DataStore::new(); - logs_store.insert("logs".into(), logs_array); - Self(vec![logs_store]) + pub(super) fn new(logs: Vec) -> Self { + Self([LogDataStore { logs }]) } } @@ -255,15 +283,20 @@ impl TryFrom> for LogsApiModel { fn try_from(buf_events: Vec) -> Result { let mut num_non_log_events = 0; let mut num_non_object_events = 0; + let message_key = log_schema().message_key_target_path().unwrap(); + let timestamp_key = log_schema().timestamp_key_target_path().unwrap(); - let logs_array: Vec = buf_events + let logs_array: Vec = buf_events .into_iter() .filter_map(|event| { - let Some(log) = event.try_into_log() else { + let Some(mut log) = event.try_into_log() else { num_non_log_events += 1; return None; }; + let message = get_message_string(log.remove(message_key)); + let timestamp = log.remove(timestamp_key).and_then(map_timestamp_value); + // We convert the log event into a logs API model simply by transmuting the type // wrapper and dropping all arrays, which are not supported by the API. We could // flatten out the keys, as this is what New Relic does internally, and we used to @@ -272,16 +305,17 @@ impl TryFrom> for LogsApiModel { // broken attributes in New Relic, and nesting objects is actually a (slightly) more // efficient representation of the key names. let (value, _metadata) = log.into_parts(); - let Some(mut obj) = value.into_object() else { + let Some(mut attributes) = value.into_object() else { num_non_object_events += 1; return None; }; - strip_arrays(&mut obj); - if !obj.contains_key("message") { - obj.insert("message".into(), Value::from("log from vector".to_owned())); - } + strip_arrays(&mut attributes); - Some(obj) + Some(LogMessage { + message, + timestamp, + attributes, + }) }) .collect(); @@ -306,6 +340,32 @@ impl TryFrom> for LogsApiModel { } } +const MILLISECONDS: f64 = 1000.0; + +/// Convert a value into a timestamp value. New Relic accepts either milliseconds or seconds since +/// epoch as an integer, or ISO8601-formatted timestamp as a string. +/// +/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/#json-logs +fn map_timestamp_value(value: Value) -> Option { + match value { + Value::Timestamp(t) => Some(Timestamp::Numeric(t.timestamp_millis())), + Value::Integer(n) => Some(Timestamp::Numeric(n)), + Value::Float(f) => Some(Timestamp::Numeric((f.into_inner() * MILLISECONDS) as i64)), + Value::Bytes(b) => Some(Timestamp::String( + String::from_utf8_lossy(b.as_ref()).into(), + )), + _ => None, + } +} + +fn get_message_string(value: Option) -> String { + match value { + Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes.as_ref()).into(), + Some(value) => value.to_string(), + None => "log from vector".to_string(), + } +} + fn strip_arrays(obj: &mut ObjectMap) { obj.retain(|_key, value| !value.is_array()); obj.iter_mut().for_each(|(_key, value)| { diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 66be6888ec984..0c82141776b4c 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -3,8 +3,9 @@ use std::{convert::TryFrom, num::NonZeroU32, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; use serde::Deserialize; +use serde_json::{json, to_value}; use vector_lib::config::{init_telemetry, Tags, Telemetry}; -use vrl::{btreemap, value}; +use vrl::value; use super::*; use crate::{ @@ -80,12 +81,12 @@ fn generates_event_api_model_without_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" => "TestEvent", - "user" => "Joe", - "user_id" => 123456, - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + }]) ); } @@ -101,13 +102,13 @@ fn generates_event_api_model_with_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" =>"TestEvent", - "user" =>"Joe", - "user_id" =>123456, - "message" =>"This is a message", - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + "message": "This is a message", + }]) ); } @@ -123,13 +124,13 @@ fn generates_event_api_model_with_json_inside_message_field() { EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); assert_eq!( - &model.0[..], - &[btreemap! { - "eventType" =>"TestEvent", - "user" =>"Joe", - "user_id" =>123456, - "my_key" =>"my_value", - }] + to_value(&model).unwrap(), + json!([{ + "eventType": "TestEvent", + "user": "Joe", + "user_id": 123456, + "my_key": "my_value", + }]) ); } @@ -137,14 +138,15 @@ fn generates_event_api_model_with_json_inside_message_field() { fn generates_log_api_model_without_message_field() { let event = Event::Log(LogEvent::from(value!({"tag_key": "tag_value"}))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "tag_key" =>"tag_value", - "message" =>"log from vector", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "log from vector", + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -155,14 +157,15 @@ fn generates_log_api_model_with_message_field() { "message": "This is a message", }))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "tag_key" =>"tag_value", - "message" =>"This is a message", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "This is a message", + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -174,15 +177,40 @@ fn generates_log_api_model_with_dotted_fields() { "three": sub, }))); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); - let logs = model.0[0].get("logs").expect("Logs data store not present"); assert_eq!( - &logs[..], - &[btreemap! { - "one.two" =>1, - "three" =>btreemap! {"four" =>2,}, - "message" =>"log from vector", - }] + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "log from vector", + "attributes": { + "one.two": 1, + "three": {"four": 2}, + }, + }] + }]) + ); +} + +#[test] +fn generates_log_api_model_with_timestamp() { + let stamp = DateTime::::from(SystemTime::now()); + let event = Event::Log(LogEvent::from(value!({ + "timestamp": stamp, + "tag_key": "tag_value", + "message": "This is a message", + }))); + let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); + + assert_eq!( + to_value(&model).unwrap(), + json!([{ + "logs": [{ + "message": "This is a message", + "timestamp": stamp.timestamp_millis(), + "attributes": {"tag_key": "tag_value"}, + }] + }]) ); } @@ -195,18 +223,18 @@ fn generates_metric_api_model_without_timestamp() { )); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); + let metrics = &model.0[0].metrics; assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "timestamp" =>metrics[0].get("timestamp").unwrap().clone(), - "type" =>"gauge", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "timestamp": metrics[0].get("timestamp").unwrap().clone(), + "type": "gauge", + }] + }]) ); } @@ -222,18 +250,17 @@ fn generates_metric_api_model_with_timestamp() { let event = Event::Metric(m); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "timestamp" =>stamp.timestamp(), - "type" =>"gauge", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "timestamp": stamp.timestamp(), + "type": "gauge", + }] + }]) ); } @@ -250,18 +277,17 @@ fn generates_metric_api_model_incremental_counter() { let event = Event::Metric(m); let model = MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); - let metrics = model.0[0] - .get("metrics") - .expect("Metric data store not present"); assert_eq!( - &metrics[..], - &[btreemap! { - "name" =>"my_metric", - "value" =>100.0, - "interval.ms" =>1000, - "timestamp" =>stamp.timestamp(), - "type" =>"count", - }] + to_value(&model).unwrap(), + json!([{ + "metrics": [{ + "name": "my_metric", + "value": 100.0, + "interval.ms": 1000, + "timestamp": stamp.timestamp(), + "type": "count", + }] + }]) ); }