Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel upgrade #1818

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### New features
- Add reverse functions and tests for arrays and strings, add sort test for arrays
- Add `compression` field in `otel` connectors for compression support on payloads.

## [0.12.4]

Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ tonic = { version = "0.6.1", default-features = false, features = [
] }
prost = "0.10.4"
prost-types = "0.9.0"
tremor-otelapis = { version = "0.2.4" }
tremor-otelapis = { version = "0.2.5" }

# aws-s3
aws-sdk-s3 = "0.15.0"
Expand Down
27 changes: 23 additions & 4 deletions src/connectors/impls/otel/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{common::OtelDefaults, logs, metrics, trace};
use super::{
common::{Compression, OtelDefaults},
logs, metrics, trace,
};
use crate::connectors::prelude::*;
use tonic::transport::Channel as TonicChannel;
use tonic::transport::Endpoint as TonicEndpoint;
Expand All @@ -39,6 +42,9 @@ pub(crate) struct Config {
/// Enables the metrics service
#[serde(default = "default_true")]
pub(crate) metrics: bool,
/// Configurable compression for otel payloads
#[serde(default = "Default::default")]
pub(crate) compression: Compression,
}

impl ConfigImpl for Config {}
Expand Down Expand Up @@ -127,10 +133,23 @@ impl Sink for OtelSink {
.connect()
.await?;

let logs_client = LogsServiceClient::new(channel.clone());
let metrics_client = MetricsServiceClient::new(channel.clone());
let trace_client = TraceServiceClient::new(channel);

let (logs_client, metrics_client, trace_client) = match self.config.compression {
Compression::Gzip => (
logs_client.accept_gzip().send_gzip(),
metrics_client.accept_gzip().send_gzip(),
trace_client.accept_gzip().send_gzip(),
),
Compression::None => (logs_client, metrics_client, trace_client),
};

self.remote = Some(RemoteOpenTelemetryEndpoint {
logs_client: LogsServiceClient::new(channel.clone()),
metrics_client: MetricsServiceClient::new(channel.clone()),
trace_client: TraceServiceClient::new(channel),
logs_client,
metrics_client,
trace_client,
});

Ok(true)
Expand Down
9 changes: 9 additions & 0 deletions src/connectors/impls/otel/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ impl url::Defaults for OtelDefaults {
const PORT: u16 = 4317;
}

/// Enum to support configurable compression on otel grpc client/servers.
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum Compression {
mfelsche marked this conversation as resolved.
Show resolved Hide resolved
#[default]
None,
Gzip,
}

pub(crate) const EMPTY: Vec<Value> = Vec::new();

pub(crate) fn any_value_to_json(pb: AnyValue) -> Value<'static> {
Expand Down
55 changes: 48 additions & 7 deletions src/connectors/impls/otel/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{common::OtelDefaults, logs, metrics, trace};
use super::{
common::{Compression, OtelDefaults},
logs, metrics, trace,
};
use crate::connectors::prelude::*;
use async_std::channel::{bounded, Receiver, Sender};
use async_std::task::JoinHandle;
use tonic::transport::Server as GrpcServer;
use tremor_otelapis::all::{self, OpenTelemetryEvents};
use tremor_otelapis::opentelemetry::proto::collector::{
logs::v1::logs_service_server::LogsServiceServer,
metrics::v1::metrics_service_server::MetricsServiceServer,
trace::v1::trace_service_server::TraceServiceServer,
};

const CONNECTOR_TYPE: &str = "otel_server";

// TODO Consider concurrency cap?
Expand All @@ -34,6 +44,9 @@ pub(crate) struct Config {
/// Enables the metrics service
#[serde(default = "default_true")]
pub(crate) metrics: bool,
/// Configure grpc compression
#[serde(default = "Default::default")]
pub(crate) compression: Compression,
}

impl ConfigImpl for Config {}
Expand Down Expand Up @@ -134,12 +147,7 @@ impl Connector for Server {
previous_handle.cancel().await;
}

let tx = self.tx.clone();

spawn_task(
ctx.clone(),
async move { Ok(all::make(endpoint, tx).await?) },
);
self.serve(ctx, endpoint).await;
Ok(true)
}
}
Expand Down Expand Up @@ -191,6 +199,39 @@ impl Source for OtelSource {
}
}

impl Server {
async fn serve(&mut self, ctx: &ConnectorContext, addr: std::net::SocketAddr) {
let trace_server =
TraceServiceServer::new(all::TraceServiceForwarder::with_sender(self.tx.clone()));

let logs_server =
LogsServiceServer::new(all::LogsServiceForwarder::with_sender(self.tx.clone()));

let metrics_server =
MetricsServiceServer::new(all::MetricsServiceForwarder::with_sender(self.tx.clone()));

// set the compression on the server.
let (trace_server, logs_server, metrics_server) = match &self.config.compression {
Compression::Gzip => (
trace_server.accept_gzip().send_gzip(),
logs_server.accept_gzip().send_gzip(),
metrics_server.accept_gzip().send_gzip(),
),

Compression::None => (trace_server, logs_server, metrics_server),
};

spawn_task(ctx.clone(), async move {
Ok(GrpcServer::builder()
.add_service(trace_server)
.add_service(logs_server)
.add_service(metrics_server)
.serve(addr)
.await?)
});
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions tremor-cli/tests/integration/otel-compression/assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
status: 0
name: otel connectors
asserts:
- source: out.log
equals_file: expected.json
74 changes: 74 additions & 0 deletions tremor-cli/tests/integration/otel-compression/config.troy
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
define flow server
flow
use integration;
use tremor::pipelines;
use tremor::connectors;

define connector otel_server from otel_server
with
config = {
"url": "127.0.0.1:4317",
"compression": "gzip",
}
end;

define pipeline instrument
into out, exit
pipeline
# recording
select event from in into out;

# quiescence
select { "exit": 0, "delay": 10 } from in
where present event.trace[0].instrumentation_library_spans[0].spans[0].attributes.`http.target`
into exit;
end;

create connector data_out from integration::write_file;
create connector exit from integration::exit;
create connector otels from otel_server;
create connector stdio from connectors::console;

create pipeline echo from instrument;
create pipeline passthrough from pipelines::passthrough;

# Echo otel server: <otel:req> -> server -> server_side -> <file>
connect /connector/otels to /pipeline/echo;
connect /connector/otels to /pipeline/passthrough;

connect /pipeline/passthrough to /connector/stdio;
connect /pipeline/echo to /connector/data_out;
connect /pipeline/echo/exit to /connector/exit;
end;

define flow client
flow
use integration;
use tremor::pipelines;

define connector otel_client from otel_client
with
config = {
"url": "127.0.0.1:4317",
"compression": "gzip",
},
reconnect = {
"retry": {
"interval_ms": 100,
"growth_rate": 2,
"max_retries": 3,
}
}
end;

create connector data_in from integration::read_file;
create connector otelc from otel_client;
create pipeline replay from pipelines::passthrough;

# Replay recorded events over otel client to server
connect /connector/data_in to /pipeline/replay;
connect /pipeline/replay to /connector/otelc;
end;

deploy flow server;
deploy flow client;
2 changes: 2 additions & 0 deletions tremor-cli/tests/integration/otel-compression/expected.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"trace":[{"instrumentation_library_spans":[{"instrumentation_library":{"name":"io.opentelemetry.jdbc","version":"1.11.0"},"schema_url":"","spans":[{"attributes":{"db.connection_string":"h2:mem:","db.name":"c3d75da4-c7e1-4f8e-9c7c-99b8da7fa791","db.statement":"DROP TABLE vet_specialties IF EXISTS","db.system":"h2","db.user":"sa","thread.id":1,"thread.name":"main"},"dropped_attributes_count":0,"dropped_events_count":0,"dropped_links_count":0,"end_time_unix_nano":1646054197371114000,"events":[],"kind":3,"links":[],"name":"c3d75da4-c7e1-4f8e-9c7c-99b8da7fa791","parent_span_id":"","span_id":"afe8d471fad0ef7c","start_time_unix_nano":1646054197366955000,"status":{"code":0,"deprecated_code":0,"message":""},"trace_id":"52c0d1949a4c908818f9e0d1fcfad17e","trace_state":""}]}],"resource":{"attributes":{"host.arch":"x86_64","host.name":"ALT05209","os.description":"Mac OS X 11.6.2","os.type":"darwin","process.command_line":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java -javaagent:./jolokia-jvm-1.7.1.jar=port=8000,host=localhost -javaagent:./opentelemetry-javaagent.jar -Dotel.java.agent.debug=true","process.executable.path":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java","process.pid":24855,"process.runtime.description":"Homebrew OpenJDK 64-Bit Server VM 11.0.12+0","process.runtime.name":"OpenJDK Runtime Environment","process.runtime.version":"11.0.12+0","service.name":"javaApp","telemetry.auto.version":"1.11.0","telemetry.sdk.language":"java","telemetry.sdk.name":"opentelemetry","telemetry.sdk.version":"1.11.0"},"dropped_attributes_count":0},"schema_url":"https://opentelemetry.io/schemas/1.8.0"}]}
{"trace":[{"instrumentation_library_spans":[{"instrumentation_library":{"name":"io.opentelemetry.tomcat-7.0","version":"1.11.0"},"schema_url":"","spans":[{"attributes":{"http.flavor":"1.1","http.host":"localhost:8090","http.method":"GET","http.route":"/owners","http.scheme":"http","http.server_name":"localhost","http.status_code":200,"http.target":"/owners?lastName=snot","http.user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36","net.peer.ip":"0:0:0:0:0:0:0:1","net.peer.name":"localhost","net.peer.port":60972,"net.transport":"ip_tcp","thread.id":245,"thread.name":"http-nio-8090-exec-7"},"dropped_attributes_count":0,"dropped_events_count":0,"dropped_links_count":0,"end_time_unix_nano":1646054328134855000,"events":[],"kind":2,"links":[],"name":"/owners","parent_span_id":"","span_id":"d647be1d9aa0ca5e","start_time_unix_nano":1646054328042871000,"status":{"code":0,"deprecated_code":0,"message":""},"trace_id":"90b0b8020de77bea3f8587b90585821d","trace_state":""}]}],"resource":{"attributes":{"host.arch":"x86_64","host.name":"ALT05209","os.description":"Mac OS X 11.6.2","os.type":"darwin","process.command_line":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java -javaagent:./jolokia-jvm-1.7.1.jar=port=8000,host=localhost -javaagent:./opentelemetry-javaagent.jar -Dotel.java.agent.debug=true","process.executable.path":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java","process.pid":24855,"process.runtime.description":"Homebrew OpenJDK 64-Bit Server VM 11.0.12+0","process.runtime.name":"OpenJDK Runtime Environment","process.runtime.version":"11.0.12+0","service.name":"javaApp","telemetry.auto.version":"1.11.0","telemetry.sdk.language":"java","telemetry.sdk.name":"opentelemetry","telemetry.sdk.version":"1.11.0"},"dropped_attributes_count":0},"schema_url":"https://opentelemetry.io/schemas/1.8.0"}]}
2 changes: 2 additions & 0 deletions tremor-cli/tests/integration/otel-compression/in.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"trace":[{"instrumentation_library_spans":[{"spans":[{"attributes":{"db.statement":"DROP TABLE vet_specialties IF EXISTS","db.connection_string":"h2:mem:","thread.name":"main","db.system":"h2","thread.id":1,"db.user":"sa","db.name":"c3d75da4-c7e1-4f8e-9c7c-99b8da7fa791"},"events":[],"links":[],"span_id":"afe8d471fad0ef7c","parent_span_id":"","trace_id":"52c0d1949a4c908818f9e0d1fcfad17e","start_time_unix_nano":1646054197366955000,"end_time_unix_nano":1646054197371114000,"trace_state":"","dropped_attributes_count":0,"dropped_events_count":0,"dropped_links_count":0,"status":{"code":0,"deprecated_code":0,"message":""},"kind":3,"name":"c3d75da4-c7e1-4f8e-9c7c-99b8da7fa791"}],"schema_url":"","instrumentation_library":{"name":"io.opentelemetry.jdbc","version":"1.11.0"}}],"schema_url":"https://opentelemetry.io/schemas/1.8.0","resource":{"attributes":{"os.description":"Mac OS X 11.6.2","os.type":"darwin","process.command_line":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java -javaagent:./jolokia-jvm-1.7.1.jar=port=8000,host=localhost -javaagent:./opentelemetry-javaagent.jar -Dotel.java.agent.debug=true","telemetry.sdk.language":"java","host.arch":"x86_64","telemetry.sdk.name":"opentelemetry","process.runtime.version":"11.0.12+0","service.name":"javaApp","process.pid":24855,"process.runtime.name":"OpenJDK Runtime Environment","telemetry.sdk.version":"1.11.0","process.runtime.description":"Homebrew OpenJDK 64-Bit Server VM 11.0.12+0","host.name":"ALT05209","process.executable.path":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java","telemetry.auto.version":"1.11.0"},"dropped_attributes_count":0}}]}
{"trace":[{"instrumentation_library_spans":[{"spans":[{"attributes":{"net.peer.ip":"0:0:0:0:0:0:0:1","http.target":"/owners?lastName=snot","http.route":"/owners","http.method":"GET","http.server_name":"localhost","thread.id":245,"http.user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36","net.peer.name":"localhost","net.transport":"ip_tcp","http.host":"localhost:8090","net.peer.port":60972,"http.flavor":"1.1","thread.name":"http-nio-8090-exec-7","http.status_code":200,"http.scheme":"http"},"events":[],"links":[],"span_id":"d647be1d9aa0ca5e","parent_span_id":"","trace_id":"90b0b8020de77bea3f8587b90585821d","start_time_unix_nano":1646054328042871000,"end_time_unix_nano":1646054328134855000,"trace_state":"","dropped_attributes_count":0,"dropped_events_count":0,"dropped_links_count":0,"status":{"code":0,"deprecated_code":0,"message":""},"kind":2,"name":"/owners"}],"schema_url":"","instrumentation_library":{"name":"io.opentelemetry.tomcat-7.0","version":"1.11.0"}}],"schema_url":"https://opentelemetry.io/schemas/1.8.0","resource":{"attributes":{"os.description":"Mac OS X 11.6.2","os.type":"darwin","process.command_line":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java -javaagent:./jolokia-jvm-1.7.1.jar=port=8000,host=localhost -javaagent:./opentelemetry-javaagent.jar -Dotel.java.agent.debug=true","telemetry.sdk.language":"java","host.arch":"x86_64","telemetry.sdk.name":"opentelemetry","process.runtime.version":"11.0.12+0","service.name":"javaApp","process.pid":24855,"process.runtime.name":"OpenJDK Runtime Environment","telemetry.sdk.version":"1.11.0","process.runtime.description":"Homebrew OpenJDK 64-Bit Server VM 11.0.12+0","host.name":"ALT05209","process.executable.path":"/usr/local/Cellar/openjdk@11/11.0.12/libexec/openjdk.jdk/Contents/Home:bin:java","telemetry.auto.version":"1.11.0"},"dropped_attributes_count":0}}]}
9 changes: 9 additions & 0 deletions tremor-cli/tests/integration/otel-compression/tags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[
"opentelemetry",
"otel",
"otlp",
"echo",
"connectors",
"connector",
"compression"
]