Skip to content

Commit

Permalink
chore(deps): Bump Rust version to 1.80 (#20949)
Browse files Browse the repository at this point in the history
* chore(deps): Bump Rust version to 1.80

* Sort integration test feature flags

* Fix feature flags on internal events

* Fix config test using `file_descriptor` source

* Make `file_descriptor` feature flag match the others' use of underscore

* Add null FD fix to file_descriptor config generator too

* Install python3-venv in build images

* Revert "Install python3-venv in build images"

This reverts commit a342978.

Was committing on the wrong branch

* Fix issue with `file_descriptor` source test

* Fix greptimedb feature flag typo

* Re-fix kafka config code

* Fix greptimedb feature

* Fix config::cmd::output_has_consistent_ordering test

* Remove bogus `unix` config guard
  • Loading branch information
bruceg authored Oct 11, 2024
1 parent 22c23c3 commit 31dc38f
Show file tree
Hide file tree
Showing 43 changed files with 136 additions and 82 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ centiseconds
cernan
cfactor
CFFI
cfgs
CGP
cgroups
chans
Expand Down
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ debug = false # Do not include debug symbols in the executable.
[profile.bench]
debug = true

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[package.metadata.deb]
name = "vector"
section = "admin"
Expand Down Expand Up @@ -538,7 +541,7 @@ sources-logs = [
"sources-nats",
"sources-opentelemetry",
"sources-pulsar",
"sources-file-descriptor",
"sources-file_descriptor",
"sources-redis",
"sources-socket",
"sources-splunk_hec",
Expand Down Expand Up @@ -575,7 +578,7 @@ sources-docker_logs = ["docker"]
sources-eventstoredb_metrics = []
sources-exec = []
sources-file = ["vector-lib/file-source"]
sources-file-descriptor = ["tokio-util/io"]
sources-file_descriptor = ["tokio-util/io"]
sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf-build", "dep:tonic"]
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "sources-http_server"]
Expand Down Expand Up @@ -757,7 +760,7 @@ sinks-file = ["dep:async-compression"]
sinks-gcp = ["sinks-gcp-chronicle", "dep:base64", "gcp"]
sinks-gcp-chronicle = ["gcp"]
sinks-greptimedb_metrics = ["dep:greptimedb-ingester"]
sinks-greptimedb_logs = []
sinks-greptimedb_logs = ["dep:greptimedb-ingester"]
sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
Expand Down Expand Up @@ -799,6 +802,7 @@ all-integration-tests = [
"datadog-logs-integration-tests",
"datadog-metrics-integration-tests",
"datadog-traces-integration-tests",
"dnstap-integration-tests",
"docker-logs-integration-tests",
"es-integration-tests",
"eventstoredb_metrics-integration-tests",
Expand All @@ -822,7 +826,6 @@ all-integration-tests = [
"pulsar-integration-tests",
"redis-integration-tests",
"splunk-integration-tests",
"dnstap-integration-tests",
"webhdfs-integration-tests",
]

Expand Down
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo')
docker_build(
ref='timberio/vector',
context='.',
build_args={'RUST_VERSION': '1.79.0'},
build_args={'RUST_VERSION': '1.80.0'},
dockerfile='tilt/Dockerfile'
)

Expand Down
2 changes: 1 addition & 1 deletion lib/vector-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! 2. A GraphQL subscription client, for long-lived, multiplexed subscriptions over WebSockets
//! 3. GraphQL queries/mutations/subscriptions, defined in `graphql/**/*.graphql` files
//! 4. Extension methods for each client, for executing queries/subscriptions, and returning
//! deserialized JSON responses
//! deserialized JSON responses
//!

#![deny(warnings)]
Expand Down
3 changes: 3 additions & 0 deletions lib/vector-buffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
publish = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[dependencies]
async-recursion = "1.1.1"
async-stream = "0.3.6"
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/benches/sized_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl DataDir {

fn next(&mut self) -> PathGuard {
let mut nxt = self.base.clone();
nxt.push(&self.index.to_string());
nxt.push(self.index.to_string());
self.index += 1;
std::fs::create_dir_all(&nxt).expect("could not make next dir");

Expand Down
3 changes: 3 additions & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
publish = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(ddsketch_extended)'] }

[dependencies]
async-graphql = { version = "7.0.7", default-features = false, features = ["playground" ], optional = true }
async-trait = { version = "0.1", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ mod tests {

#[test]
fn with_hash_map() {
#[allow(clippy::mutable_key_type)]
let mut map: HashMap<Discriminant, usize> = HashMap::new();

let event_stream_1 = {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ dyn_clone::clone_trait_object!(FunctionTransform);
/// # Invariants
///
/// * It is an illegal invariant to implement `FunctionTransform` for a
/// `TaskTransform` or vice versa.
/// `TaskTransform` or vice versa.
pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
fn transform(
self: Box<Self>,
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-tap/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,15 @@ async fn tap_handler(
debug!(message = "Stopped tap.", outputs_patterns = ?patterns.for_outputs, inputs_patterns = ?patterns.for_inputs);
}

#[cfg(all(
// FIXME: These these should be merged into `src/api/tests.rs` where _most_ of them were moved.
/*#[cfg(all(
test,
feature = "sinks-blackhole",
feature = "sources-demo_logs",
feature = "transforms-log_to_metric",
feature = "transforms-remap",
))]
))]*/
#[cfg(not(all()))]
mod tests {
use std::time::Duration;

Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.79"
channel = "1.80"
profile = "default"
7 changes: 6 additions & 1 deletion src/config/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ mod tests {
SeedableRng,
};
use serde_json::json;
use similar_asserts::assert_eq;
use vector_lib::configurable::component::{
SinkDescription, SourceDescription, TransformDescription,
};
Expand Down Expand Up @@ -282,7 +283,11 @@ mod tests {

/// Select any 2-4 sources
fn arb_sources() -> impl Strategy<Value = Vec<&'static str>> {
sample::subsequence(SourceDescription::types(), 2..=4)
let mut types = SourceDescription::types();
// The `file_descriptor` source produces different defaults each time it is used, and so
// will never compare equal below.
types.retain(|t| *t != "file_descriptor");
sample::subsequence(types, 2..=4)
}

/// Select any 2-4 transforms
Expand Down
15 changes: 10 additions & 5 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,22 +687,27 @@ mod tests {
}

#[tokio::test]
#[cfg(unix)]
#[cfg(all(unix, feature = "sources-file_descriptor"))]
async fn no_conflict_fd_resources() {
use crate::sources::file_descriptors::file_descriptor::null_fd;
let fd1 = null_fd().unwrap();
let fd2 = null_fd().unwrap();
let result = load(
r#"
&format!(
r#"
[sources.file_descriptor1]
type = "file_descriptor"
fd = 10
fd = {fd1}
[sources.file_descriptor2]
type = "file_descriptor"
fd = 20
fd = {fd2}
[sinks.out]
type = "test_basic"
inputs = ["file_descriptor1", "file_descriptor2"]
"#,
"#
),
Format::Toml,
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl TransformContext {
}
}

#[cfg(any(test, feature = "test"))]
#[cfg(test)]
pub fn new_test(
schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::Error;
/// But, config and topology reload logic can handle:
/// - Invalid config, caused either by user or by data race.
/// - Frequent changes, caused by user/editor modifying/saving file in small chunks.
/// so we can use smaller, more responsive delay.
/// so we can use smaller, more responsive delay.
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
Expand Down
2 changes: 1 addition & 1 deletion src/convert_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ mod tests {
let extension = path.extension().unwrap().to_str().unwrap();
if extension == Format::Yaml.to_string() {
// Note that here we read the converted string directly.
let converted_config = fs::read_to_string(&output_dir.join(&path)).unwrap();
let converted_config = fs::read_to_string(output_dir.join(&path)).unwrap();
assert_eq!(converted_config, original_config);
count += 1;
}
Expand Down
8 changes: 4 additions & 4 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,9 @@ impl Default for KeepaliveConfig {
///
/// **Notes:**
/// - This is intended to be used in a Hyper server (or similar) that will automatically close
/// the connection after a response with a `Connection: close` header is sent.
/// the connection after a response with a `Connection: close` header is sent.
/// - This layer assumes that it is instantiated once per connection, which is true within the
/// Hyper framework.
/// Hyper framework.

pub struct MaxConnectionAgeLayer {
start_reference: Instant,
Expand Down Expand Up @@ -495,9 +495,9 @@ where
///
/// **Notes:**
/// - This is intended to be used in a Hyper server (or similar) that will automatically close
/// the connection after a response with a `Connection: close` header is sent.
/// the connection after a response with a `Connection: close` header is sent.
/// - This service assumes that it is instantiated once per connection, which is true within the
/// Hyper framework.
/// Hyper framework.
#[derive(Clone)]
pub struct MaxConnectionAgeService<S> {
service: S,
Expand Down
2 changes: 2 additions & 0 deletions src/internal_events/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl InternalEvent for HttpEventsReceived<'_> {
}
}

#[cfg(feature = "sources-utils-http")]
#[derive(Debug)]
pub struct HttpBadRequest<'a> {
code: u16,
Expand All @@ -116,6 +117,7 @@ impl<'a> HttpBadRequest<'a> {
}
}

#[cfg(feature = "sources-utils-http")]
impl<'a> InternalEvent for HttpBadRequest<'a> {
fn emit(self) {
warn!(
Expand Down
4 changes: 2 additions & 2 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod encoding_transcode;
mod eventstoredb_metrics;
#[cfg(feature = "sources-exec")]
mod exec;
#[cfg(any(feature = "sources-file-descriptor", feature = "sources-stdin"))]
#[cfg(any(feature = "sources-file_descriptor", feature = "sources-stdin"))]
mod file_descriptor;
#[cfg(feature = "transforms-filter")]
mod filter;
Expand Down Expand Up @@ -189,7 +189,7 @@ pub(crate) use self::exec::*;
feature = "sinks-file",
))]
pub(crate) use self::file::*;
#[cfg(any(feature = "sources-file-descriptor", feature = "sources-stdin"))]
#[cfg(any(feature = "sources-file_descriptor", feature = "sources-stdin"))]
pub(crate) use self::file_descriptor::*;
#[cfg(feature = "transforms-filter")]
pub(crate) use self::filter::*;
Expand Down
1 change: 0 additions & 1 deletion src/internal_events/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
feature = "sources-apache_metrics",
feature = "sources-aws_ecs_metrics",
feature = "sources-aws_kinesis_firehose",
feature = "sources-http-client",
feature = "sources-utils-http",
))]
pub(crate) fn http_error_code(code: u16) -> String {
Expand Down
2 changes: 2 additions & 0 deletions src/internal_events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ impl InternalEvent for TcpSocketConnectionShutdown {
}
}

#[cfg(all(unix, feature = "sources-dnstap"))]
#[derive(Debug)]
pub struct TcpSocketError<'a, E> {
pub(crate) error: &'a E,
pub peer_addr: SocketAddr,
}

#[cfg(all(unix, feature = "sources-dnstap"))]
impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
fn emit(self) {
error!(
Expand Down
10 changes: 10 additions & 0 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,20 @@ impl<E: std::error::Error> InternalEvent for UnixSocketOutgoingConnectionError<E
}
}

#[cfg(all(
unix,
any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
))]
#[derive(Debug)]
pub struct UnixSocketError<'a, E> {
pub(crate) error: &'a E,
pub path: &'a std::path::Path,
}

#[cfg(all(
unix,
any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
))]
impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
fn emit(self) {
error!(
Expand All @@ -57,12 +65,14 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
}
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
#[derive(Debug)]
pub struct UnixSocketSendError<'a, E> {
pub(crate) error: &'a E,
pub path: &'a std::path::Path,
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
fn emit(self) {
let reason = "Unix socket send error.";
Expand Down
2 changes: 1 addition & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl KafkaAuthConfig {
if let Some(verify_certificate) = &tls.options.verify_certificate {
client.set(
"enable.ssl.certificate.verification",
&verify_certificate.to_string(),
verify_certificate.to_string(),
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn firehose_put_records_without_partition_key() {
#[allow(clippy::needless_collect)] // https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();
for hit in hits {
let hit = hit
Expand Down Expand Up @@ -235,7 +235,7 @@ async fn firehose_put_records_with_partition_key() {
#[allow(clippy::needless_collect)] // https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();
for hit in hits {
let hit = hit
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
written_bytes +=
as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
writer.write_all(&[b'\n'])?;
// False positive clippy hit on the following line. Clippy wants us to skip the
// borrow, but then the value is moved for the following line.
#[allow(clippy::needless_borrows_for_generic_args)]
serde_json::to_writer(&mut writer, log)?;
writer.write_all(&[b'\n'])?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ async fn run_insert_tests_with_config(
// https://github.com/rust-lang/rust-clippy/issues/6909
let input = input
.into_iter()
.map(|rec| serde_json::to_value(&rec.into_log()).unwrap())
.map(|rec| serde_json::to_value(rec.into_log()).unwrap())
.collect::<Vec<_>>();

for hit in hits {
Expand Down
Loading

0 comments on commit 31dc38f

Please sign in to comment.