Skip to content

Commit

Permalink
Fix http/protobuf exporter by supplying a client
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeschastny committed Jan 31, 2024
1 parent 5460df6 commit 45e650d
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 29 deletions.
95 changes: 78 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
] }
tracing-opentelemetry = "0.22"
opentelemetry-http = "0.10.0"
opentelemetry-http = { version = "0.10.0", optional = true, features = ["hyper", "tokio"] }
opentelemetry-otlp = { version = "0.14", optional = true, features = ["http-proto"] }
opentelemetry-zipkin = { version = "0.19", features = [], optional = true }
opentelemetry_sdk = { version = "0.21", default-features = false, features = [
Expand All @@ -34,21 +34,23 @@ tower = { version = "0.4", optional = true }
axum = { version = "0.7.4", optional = true }
pin-project-lite = { version = "0.2", optional = true }
futures-util = { version = "0.3", default_features = false, features = [], optional = true }
hyper = { version = "1.1.0", default-features = false, features = ["http1", "client"], optional = true }
hyper = { version = "0.14", default-features = false, features = ["http1", "client"], optional = true }
hyper-v1 = { package = "hyper", version = "1.1", default-features = false, features = ["http1", "client"], optional = true }
http-body-util = { version = "0.1.0", optional = true }

[dev-dependencies]
assert2 = "0.3"
rstest = "0.18"

[features]
full = ["integration_test"]
full = ["test"]
default = ["otlp", "zipkin"]
zipkin = ["dep:opentelemetry-zipkin"]
otlp = ["opentelemetry-otlp/http-proto", "tracer", "dep:tracing-opentelemetry-instrumentation-sdk"]
otlp = ["opentelemetry-otlp/http-proto", "tracer", "dep:tracing-opentelemetry-instrumentation-sdk", "dep:opentelemetry-http", "dep:hyper"]
tracer = ["dep:opentelemetry-semantic-conventions"]
integration_test = ["axum", "dep:serde", "dep:serde_json", "dep:opentelemetry_api", "dep:rand", "dep:hyper", "dep:http-body-util"]
test = ["axum", "dep:serde", "dep:serde_json", "dep:opentelemetry_api", "dep:rand", "dep:hyper"]
axum = ["dep:axum", "dep:tower", "dep:futures-util", "dep:pin-project-lite", "dep:tracing-opentelemetry-instrumentation-sdk"]
hyper-v1 = ["dep:hyper-v1", "dep:http-body-util"]

[profile.dev]
lto = false
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use tracing_opentelemetry_instrumentation_sdk;
#[cfg(feature = "otlp")]
pub mod otlp;

#[cfg(feature = "integration_test")]
#[cfg(feature = "test")]
pub mod test;

mod filter;
Expand Down
6 changes: 6 additions & 0 deletions src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
use std::{collections::HashMap, str::FromStr};

use opentelemetry::trace::TraceError;
use opentelemetry_http::hyper::HyperClient;
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_sdk::{
trace::{Sampler, Tracer},
Resource,
};
use std::time::Duration;
use tracing::Level;

#[must_use]
Expand All @@ -36,6 +38,10 @@ where
let exporter: SpanExporterBuilder = match protocol.as_str() {
"http/protobuf" => opentelemetry_otlp::new_exporter()
.http()
.with_http_client(HyperClient::new_with_timeout(
hyper::Client::new(),
Duration::from_millis(1500), // TODO: make configurable
))
.with_endpoint(endpoint)
.with_headers(read_headers_from_env())
.into(),
Expand Down
32 changes: 26 additions & 6 deletions src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,54 @@
pub mod jaegar;

#[cfg(feature = "hyper-v1")]
use http_body_util::BodyExt;
use hyper::{header::HeaderValue, HeaderMap, Response};
#[cfg(not(feature = "hyper-v1"))]
use hyper::{body::Bytes, header::HeaderValue, HeaderMap, Result};
#[cfg(feature = "hyper-v1")]
use hyper_v1::{header::HeaderValue, HeaderMap, Result};

pub use opentelemetry_api::trace::{SpanId, TraceId};
use rand::Rng;

#[cfg(feature = "hyper-v1")]
type Response = hyper_v1::Response<hyper_v1::body::Incoming>;
#[cfg(not(feature = "hyper-v1"))]
type Response = hyper::Response<hyper::Body>;

#[derive(Debug)]
pub struct TracedResponse {
resp: Response<hyper::body::Incoming>,
resp: Response,
pub trace_id: TraceId,
pub span_id: SpanId,
}

impl TracedResponse {
pub fn new(resp: Response<hyper::body::Incoming>, traceparent: Traceparent) -> Self {
pub fn new(resp: Response, traceparent: Traceparent) -> Self {
Self {
resp,
trace_id: traceparent.trace_id,
span_id: traceparent.span_id,
}
}

#[cfg(feature = "axum")]
pub async fn into_axum_bytes(self) -> hyper::Result<axum::body::Bytes> {
#[cfg(not(feature = "hyper-v1"))]
pub async fn get_bytes(&mut self) -> Result<Bytes> {
hyper::body::to_bytes(self.body_mut()).await
}

#[cfg(not(feature = "hyper-v1"))]
pub async fn into_bytes(self) -> Result<Bytes> {
hyper::body::to_bytes(self.resp).await
}

#[cfg(all(feature = "axum", feature = "hyper-v1"))]
pub async fn into_axum_bytes(self) -> Result<axum::body::Bytes> {
Ok(self.resp.into_body().collect().await?.to_bytes())
}
}

impl std::ops::Deref for TracedResponse {
type Target = Response<hyper::body::Incoming>;
type Target = Response;

fn deref(&self) -> &Self::Target {
&self.resp
Expand Down

0 comments on commit 45e650d

Please sign in to comment.