Skip to content

Commit

Permalink
fix(new_relic sink): Put log API attributes in separate structure (#2…
Browse files Browse the repository at this point in the history
…1313)

* fix(new_relic sink): Put log API attributes in separate structure

The New Relic log API specifies that all attributes other than `message` and
`timestamp` should go into an `attributes` field, which may contain any JSON
object data except for arrays. This change reworks the logs API model to move
all attributes from the log event into this attributes field as well as using
structures to encode fields with fixed names instead of using dynamic maps.

The tests have also been updated to compare the overall structure of the output
data model so as to ensure that the resulting encoding generated by `serde`
matches what we is specified in the New Relic API documentation.

* Add change log entry
  • Loading branch information
bruceg authored Sep 18, 2024
1 parent f83b47c commit 162d9b5
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 106 deletions.
4 changes: 4 additions & 0 deletions changelog.d/21313-new-relic-logs-attributes.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Previously, when the `new_relic` sink sent non-standard event fields to the logs
API, it would include those fields beside the standard event fields (i.e.
`message` and `timestamp`). Now, any such fields are sent in an `attributes`
object, as specified by the New Relic logs API documentation.
14 changes: 7 additions & 7 deletions src/sinks/new_relic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ mod model;
mod service;
mod sink;

pub use config::*;
pub use encoding::*;
pub use model::*;
pub use service::*;
pub use sink::*;
use config::*;
use encoding::*;
use model::*;
use service::*;
use sink::*;

pub use super::{Healthcheck, VectorSink};
use super::{Healthcheck, VectorSink};

#[cfg(test)]
pub mod tests;
mod tests;
116 changes: 88 additions & 28 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,35 @@ use std::{

use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};
use vector_lib::event::ObjectMap;
use serde::Serialize;
use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
use vector_lib::{config::log_schema, event::ObjectMap};
use vrl::event_path;

use super::NewRelicSinkError;
use crate::event::{Event, KeyString, MetricKind, MetricValue, Value};
use crate::event::{Event, MetricKind, MetricValue, Value};

#[derive(Debug)]
pub enum NewRelicApiModel {
pub(super) enum NewRelicApiModel {
Metrics(MetricsApiModel),
Events(EventsApiModel),
Logs(LogsApiModel),
}

type DataStore = HashMap<KeyString, Vec<ObjectMap>>;
/// The metrics API data model.
///
/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/metric-api/report-metrics-metric-api/
#[derive(Debug, Serialize)]
pub(super) struct MetricsApiModel(pub [MetricDataStore; 1]);

#[derive(Serialize, Deserialize, Debug)]
pub struct MetricsApiModel(pub Vec<DataStore>);
#[derive(Debug, Serialize)]
pub(super) struct MetricDataStore {
pub metrics: Vec<ObjectMap>,
}

impl MetricsApiModel {
pub fn new(metric_array: Vec<ObjectMap>) -> Self {
let mut metric_store = DataStore::new();
metric_store.insert("metrics".into(), metric_array);
Self(vec![metric_store])
pub(super) fn new(metrics: Vec<ObjectMap>) -> Self {
Self([MetricDataStore { metrics }])
}
}

Expand Down Expand Up @@ -144,11 +148,14 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct EventsApiModel(pub Vec<ObjectMap>);
/// The events API data mode.
///
/// Reference: https://docs.newrelic.com/docs/data-apis/ingest-apis/event-api/introduction-event-api/
#[derive(Debug, Serialize)]
pub(super) struct EventsApiModel(pub Vec<ObjectMap>);

impl EventsApiModel {
pub fn new(events_array: Vec<ObjectMap>) -> Self {
pub(super) fn new(events_array: Vec<ObjectMap>) -> Self {
Self(events_array)
}
}
Expand Down Expand Up @@ -238,14 +245,35 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct LogsApiModel(pub Vec<DataStore>);
/// The logs API data model.
///
/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/
#[derive(Serialize, Debug)]
pub(super) struct LogsApiModel(pub [LogDataStore; 1]);

#[derive(Serialize, Debug)]
pub(super) struct LogDataStore {
pub logs: Vec<LogMessage>,
}

#[derive(Debug, PartialEq, Serialize)]
pub(super) struct LogMessage {
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<Timestamp>,
pub attributes: ObjectMap,
}

#[derive(Debug, PartialEq, Serialize)]
#[serde(untagged)]
pub(super) enum Timestamp {
Numeric(i64),
String(String),
}

impl LogsApiModel {
pub fn new(logs_array: Vec<ObjectMap>) -> Self {
let mut logs_store = DataStore::new();
logs_store.insert("logs".into(), logs_array);
Self(vec![logs_store])
pub(super) fn new(logs: Vec<LogMessage>) -> Self {
Self([LogDataStore { logs }])
}
}

Expand All @@ -255,15 +283,20 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut num_non_log_events = 0;
let mut num_non_object_events = 0;
let message_key = log_schema().message_key_target_path().unwrap();
let timestamp_key = log_schema().timestamp_key_target_path().unwrap();

let logs_array: Vec<ObjectMap> = buf_events
let logs_array: Vec<LogMessage> = buf_events
.into_iter()
.filter_map(|event| {
let Some(log) = event.try_into_log() else {
let Some(mut log) = event.try_into_log() else {
num_non_log_events += 1;
return None;
};

let message = get_message_string(log.remove(message_key));
let timestamp = log.remove(timestamp_key).and_then(map_timestamp_value);

// We convert the log event into a logs API model simply by transmuting the type
// wrapper and dropping all arrays, which are not supported by the API. We could
// flatten out the keys, as this is what New Relic does internally, and we used to
Expand All @@ -272,16 +305,17 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
// broken attributes in New Relic, and nesting objects is actually a (slightly) more
// efficient representation of the key names.
let (value, _metadata) = log.into_parts();
let Some(mut obj) = value.into_object() else {
let Some(mut attributes) = value.into_object() else {
num_non_object_events += 1;
return None;
};
strip_arrays(&mut obj);
if !obj.contains_key("message") {
obj.insert("message".into(), Value::from("log from vector".to_owned()));
}
strip_arrays(&mut attributes);

Some(obj)
Some(LogMessage {
message,
timestamp,
attributes,
})
})
.collect();

Expand All @@ -306,6 +340,32 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
}
}

const MILLISECONDS: f64 = 1000.0;

/// Convert a value into a timestamp value. New Relic accepts either milliseconds or seconds since
/// epoch as an integer, or ISO8601-formatted timestamp as a string.
///
/// Reference: https://docs.newrelic.com/docs/logs/log-api/introduction-log-api/#json-logs
fn map_timestamp_value(value: Value) -> Option<Timestamp> {
match value {
Value::Timestamp(t) => Some(Timestamp::Numeric(t.timestamp_millis())),
Value::Integer(n) => Some(Timestamp::Numeric(n)),
Value::Float(f) => Some(Timestamp::Numeric((f.into_inner() * MILLISECONDS) as i64)),
Value::Bytes(b) => Some(Timestamp::String(
String::from_utf8_lossy(b.as_ref()).into(),
)),
_ => None,
}
}

fn get_message_string(value: Option<Value>) -> String {
match value {
Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes.as_ref()).into(),
Some(value) => value.to_string(),
None => "log from vector".to_string(),
}
}

fn strip_arrays(obj: &mut ObjectMap) {
obj.retain(|_key, value| !value.is_array());
obj.iter_mut().for_each(|(_key, value)| {
Expand Down
Loading

0 comments on commit 162d9b5

Please sign in to comment.