Skip to content

Commit

Permalink
Fix all breaking changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeschastny committed Nov 18, 2024
1 parent 0e0fe41 commit f20df80
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 52 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ http-body-util = { version = "0.1.1", optional = true }
aws-types = { version = "1", optional = true }
lambda_runtime = { version = "0", optional = true }
paste = { version = "1.0.14", optional = true }
hyper-util = "0.1.9"

[dev-dependencies]
assert2 = "0.3"
Expand Down
84 changes: 34 additions & 50 deletions src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

use std::{collections::HashMap, str::FromStr};

use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::trace::TraceError;
use opentelemetry_http::hyper::HyperClient;
use opentelemetry_otlp::{ExportConfig, Protocol, SpanExporterBuilder};
use opentelemetry_otlp::{
ExportConfig, Protocol, SpanExporter, WithExportConfig, WithHttpConfig,
};
use opentelemetry_sdk::{
trace::{Sampler, TracerProvider},
runtime,
trace::{Config as TraceConfig, Sampler, TracerProvider},
Resource,
};
use std::time::Duration;
Expand All @@ -19,8 +21,8 @@ use crate::util;

#[must_use]
pub fn identity(
v: opentelemetry_otlp::OtlpTracePipeline<SpanExporterBuilder>,
) -> opentelemetry_otlp::OtlpTracePipeline<SpanExporterBuilder> {
v: opentelemetry_sdk::trace::Builder,
) -> opentelemetry_sdk::trace::Builder {
v
}

Expand All @@ -30,46 +32,41 @@ pub fn init_tracer<F>(
transform: F,
) -> Result<TracerProvider, TraceError>
where
F: FnOnce(
opentelemetry_otlp::OtlpTracePipeline<SpanExporterBuilder>,
) -> opentelemetry_otlp::OtlpTracePipeline<SpanExporterBuilder>,
F: FnOnce(opentelemetry_sdk::trace::Builder) -> opentelemetry_sdk::trace::Builder,
{
use opentelemetry_otlp::WithExportConfig;

let (maybe_protocol, maybe_endpoint, maybe_timeout) = read_export_config_from_env();
let export_config = infer_export_config(
maybe_protocol.as_deref(),
maybe_endpoint.as_deref(),
maybe_timeout.as_deref(),
)?;
tracing::debug!(target: "otel::setup", export_config = format!("{export_config:?}"));
let exporter: SpanExporterBuilder = match export_config.protocol {
Protocol::HttpBinary => opentelemetry_otlp::new_exporter()
.http()
.with_http_client(HyperClient::new_with_timeout(
Client::builder(TokioExecutor::new()).build_http(),
let exporter: SpanExporter = match export_config.protocol {
Protocol::HttpBinary => SpanExporter::builder()
.with_http()
.with_http_client(HyperClient::with_default_connector(
export_config.timeout,
None,
))
.with_headers(read_headers_from_env())
.with_export_config(export_config)
.into(),
Protocol::Grpc => opentelemetry_otlp::new_exporter()
.tonic()
.build()?,
Protocol::Grpc => SpanExporter::builder()
.with_tonic()
.with_export_config(export_config)
.into(),
.build()?,
Protocol::HttpJson => unreachable!("HttpJson protocol is not supported"),
};

let mut pipeline = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
let tracer_provider_builder = TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_config(
TraceConfig::default()
.with_resource(resource)
.with_sampler(read_sampler_from_env()),
);
pipeline = transform(pipeline);
pipeline.install_batch(opentelemetry_sdk::runtime::Tokio)

Ok(transform(tracer_provider_builder).build())
}

/// turn a string of "k1=v1,k2=v2,..." into an iterator of (key, value) tuples
Expand Down Expand Up @@ -157,12 +154,6 @@ fn infer_export_config(
},
};

let endpoint = match protocol {
Protocol::HttpBinary => maybe_endpoint.unwrap_or("http://localhost:4318"),
Protocol::Grpc => maybe_endpoint.unwrap_or("http://localhost:4317"),
Protocol::HttpJson => unreachable!("HttpJson protocol is not supported"),
};

let timeout = match maybe_timeout {
Some(millis) => Duration::from_millis(millis.parse::<u64>().map_err(|err| {
TraceError::from(format!("invalid timeout {millis:?} form env: {err}"))
Expand All @@ -173,7 +164,7 @@ fn infer_export_config(
};

Ok(ExportConfig {
endpoint: endpoint.to_owned(),
endpoint: maybe_endpoint.map(ToOwned::to_owned),
protocol,
timeout,
})
Expand All @@ -191,55 +182,48 @@ mod tests {
Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT);

#[rstest]
#[case(None, None, None, HttpBinary, "http://localhost:4318", TIMEOUT)]
#[case(
Some("http/protobuf"),
None,
None,
HttpBinary,
"http://localhost:4318",
TIMEOUT
)]
#[case(Some("http"), None, None, HttpBinary, "http://localhost:4318", TIMEOUT)]
#[case(Some("grpc"), None, None, Grpc, "http://localhost:4317", TIMEOUT)]
#[case(None, None, None, HttpBinary, None, TIMEOUT)]
#[case(Some("http/protobuf"), None, None, HttpBinary, None, TIMEOUT)]
#[case(Some("http"), None, None, HttpBinary, None, TIMEOUT)]
#[case(Some("grpc"), None, None, Grpc, None, TIMEOUT)]
#[case(
None,
Some("http://localhost:4317"),
None,
Grpc,
"http://localhost:4317",
Some("http://localhost:4317"),
TIMEOUT
)]
#[case(
Some("http/protobuf"),
Some("http://localhost:4318"),
None,
HttpBinary,
"http://localhost:4318",
Some("http://localhost:4318"),
TIMEOUT
)]
#[case(
Some("http/protobuf"),
Some("https://examples.com:4318"),
None,
HttpBinary,
"https://examples.com:4318",
Some("https://examples.com:4318"),
TIMEOUT
)]
#[case(
Some("http/protobuf"),
Some("https://examples.com:4317"),
Some("12345"),
HttpBinary,
"https://examples.com:4317",
Some("https://examples.com:4317"),
Duration::from_millis(12345)
)]
fn test_infer_export_config(
#[case] traces_protocol: Option<&str>,
#[case] traces_endpoint: Option<&str>,
#[case] traces_timeout: Option<&str>,
#[case] expected_protocol: Protocol,
#[case] expected_endpoint: &str,
#[case] expected_endpoint: Option<&str>,
#[case] expected_timeout: Duration,
) {
let ExportConfig {
Expand All @@ -250,7 +234,7 @@ mod tests {
.unwrap();

assert!(protocol == expected_protocol);
assert!(endpoint == expected_endpoint);
assert!(endpoint.as_deref() == expected_endpoint);
assert!(timeout == expected_timeout);
}
}

0 comments on commit f20df80

Please sign in to comment.