diff --git a/Tiltfile b/Tiltfile index 22902f119b266..28cfedc46312b 100644 --- a/Tiltfile +++ b/Tiltfile @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo') docker_build( ref='timberio/vector', context='.', - build_args={'RUST_VERSION': '1.80.0'}, + build_args={'RUST_VERSION': '1.81.0'}, dockerfile='tilt/Dockerfile' ) diff --git a/lib/prometheus-parser/src/line.rs b/lib/prometheus-parser/src/line.rs index 28bd432b451be..f5698648da065 100644 --- a/lib/prometheus-parser/src/line.rs +++ b/lib/prometheus-parser/src/line.rs @@ -373,7 +373,7 @@ fn parse_name(input: &str) -> IResult { } fn trim_space(input: &str) -> &str { - input.trim_start_matches(|c| c == ' ' || c == '\t') + input.trim_start_matches([' ', '\t']) } fn sp<'a, E: ParseError<&'a str>>(i: &'a str) -> nom::IResult<&'a str, &'a str, E> { diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 39a085c8b80a3..ed7d1c8b473e6 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -625,7 +625,7 @@ impl EventDataEq for LogEvent { #[cfg(any(test, feature = "test"))] mod test_utils { - use super::*; + use super::{log_schema, Bytes, LogEvent, Utc}; // these rely on the global log schema, which is no longer supported when using the // "LogNamespace::Vector" namespace. diff --git a/lib/vector-core/src/schema/definition.rs b/lib/vector-core/src/schema/definition.rs index be3c20a214f75..aff0522606774 100644 --- a/lib/vector-core/src/schema/definition.rs +++ b/lib/vector-core/src/schema/definition.rs @@ -537,7 +537,7 @@ impl Definition { #[cfg(any(test, feature = "test"))] mod test_utils { - use super::*; + use super::{Definition, Kind}; use crate::event::{Event, LogEvent}; impl Definition { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 21c73f28f46c7..6f4d5bc316580 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.80" +channel = "1.81" profile = "default" diff --git a/src/api/schema/metrics/sink/generic.rs b/src/api/schema/metrics/sink/generic.rs index e0f96e38fbd6d..83c50bc6557b7 100644 --- a/src/api/schema/metrics/sink/generic.rs +++ b/src/api/schema/metrics/sink/generic.rs @@ -9,7 +9,7 @@ use crate::{ pub struct GenericSinkMetrics(Vec); impl GenericSinkMetrics { - pub fn new(metrics: Vec) -> Self { + pub const fn new(metrics: Vec) -> Self { Self(metrics) } } diff --git a/src/api/schema/metrics/source/file.rs b/src/api/schema/metrics/source/file.rs index 24e8fbf64b8d6..bebe6bbf6b810 100644 --- a/src/api/schema/metrics/source/file.rs +++ b/src/api/schema/metrics/source/file.rs @@ -57,7 +57,7 @@ impl<'a> FileSourceMetricFile<'a> { pub struct FileSourceMetrics(Vec); impl FileSourceMetrics { - pub fn new(metrics: Vec) -> Self { + pub const fn new(metrics: Vec) -> Self { Self(metrics) } diff --git a/src/api/schema/metrics/source/generic.rs b/src/api/schema/metrics/source/generic.rs index c66d50a841c91..12435c1b36e90 100644 --- a/src/api/schema/metrics/source/generic.rs +++ b/src/api/schema/metrics/source/generic.rs @@ -9,7 +9,7 @@ use crate::{ pub struct GenericSourceMetrics(Vec); impl GenericSourceMetrics { - pub fn new(metrics: Vec) -> Self { + pub const fn new(metrics: Vec) -> Self { Self(metrics) } } diff --git a/src/api/schema/metrics/transform/generic.rs b/src/api/schema/metrics/transform/generic.rs index 0fd2569551473..854a845226cc1 100644 --- a/src/api/schema/metrics/transform/generic.rs +++ b/src/api/schema/metrics/transform/generic.rs @@ -9,7 +9,7 @@ use crate::{ pub struct GenericTransformMetrics(Vec); impl GenericTransformMetrics { - pub fn new(metrics: Vec) -> Self { + pub const fn new(metrics: Vec) -> Self { Self(metrics) } } diff --git a/src/api/server.rs b/src/api/server.rs index 3c4860f9f2e8e..caf6512b62d3e 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -45,12 +45,11 @@ impl Server { let _guard = handle.enter(); let addr = config.api.address.expect("No socket address"); - let incoming = AddrIncoming::bind(&addr).map_err(|error| { + let incoming = AddrIncoming::bind(&addr).inspect_err(|error| { emit!(SocketBindError { mode: SocketMode::Tcp, - error: &error, + error, }); - error })?; let span = Span::current(); diff --git a/src/app.rs b/src/app.rs index e27d68e5a6cab..5593d8fd0979c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -434,12 +434,11 @@ impl FinishedApplication { fn get_log_levels(default: &str) -> String { std::env::var("VECTOR_LOG") .or_else(|_| { - std::env::var("LOG").map(|log| { + std::env::var("LOG").inspect(|_log| { warn!( message = "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead." ); - log }) }) .unwrap_or_else(|_| default.into()) diff --git a/src/aws/mod.rs b/src/aws/mod.rs index a4a68d34a2f96..775bb4c08a85f 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -363,7 +363,7 @@ struct MeasuredBody { } impl MeasuredBody { - fn new(body: SdkBody, shared_bytes_sent: Arc) -> Self { + const fn new(body: SdkBody, shared_bytes_sent: Arc) -> Self { Self { inner: body, shared_bytes_sent, diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index 90fc349f0a1ef..514153ad2b16b 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -132,7 +132,7 @@ pub struct ValidationConfiguration { impl ValidationConfiguration { /// Creates a new `ValidationConfiguration` for a source. - pub fn from_source( + pub const fn from_source( component_name: &'static str, log_namespace: LogNamespace, component_configurations: Vec, @@ -146,7 +146,7 @@ impl ValidationConfiguration { } /// Creates a new `ValidationConfiguration` for a transform. - pub fn from_transform( + pub const fn from_transform( component_name: &'static str, log_namespace: LogNamespace, component_configurations: Vec, @@ -160,7 +160,7 @@ impl ValidationConfiguration { } /// Creates a new `ValidationConfiguration` for a sink. - pub fn from_sink( + pub const fn from_sink( component_name: &'static str, log_namespace: LogNamespace, component_configurations: Vec, diff --git a/src/generate.rs b/src/generate.rs index c4032f0fde148..f60ff66a2bb9d 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -119,7 +119,7 @@ pub(crate) fn generate_example( ) -> Result> { let components: Vec> = opts .expression - .split(|c| c == '|' || c == '/') + .split(['|', '/']) .map(|s| { s.split(',') .map(|s| s.trim().to_string()) diff --git a/src/http.rs b/src/http.rs index e2d16d11589fa..f8e1c939c58c8 100644 --- a/src/http.rs +++ b/src/http.rs @@ -139,13 +139,9 @@ where // Handle the errors and extract the response. let response = response_result - .map_err(|error| { + .inspect_err(|error| { // Emit the error into the internal events system. - emit!(http_client::GotHttpWarning { - error: &error, - roundtrip - }); - error + emit!(http_client::GotHttpWarning { error, roundtrip }); }) .context(CallRequestSnafu)?; diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index babc502b9c193..44c475f208cae 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -22,7 +22,7 @@ pub(super) struct AmqpRequest { } impl AmqpRequest { - pub(super) fn new( + pub(super) const fn new( body: Bytes, exchange: String, routing_key: String, diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index d22f9d2acdf59..00f06301eb164 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -101,7 +101,7 @@ async fn firehose_put_records_without_partition_key() { .expect("Could not build HTTP client"); let response = client - .get(&format!("{}/{}/_search", common.base_url, stream)) + .get(format!("{}/{}/_search", common.base_url, stream)) .json(&json!({ "query": { "query_string": { "query": "*" } } })) @@ -213,7 +213,7 @@ async fn firehose_put_records_with_partition_key() { .expect("Could not build HTTP client"); let response = client - .get(&format!("{}/{}/_search", common.base_url, stream)) + .get(format!("{}/{}/_search", common.base_url, stream)) .json(&json!({ "query": { "query_string": { "query": "*" } } })) diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index 6020d943aa865..57958858f0122 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -76,9 +76,8 @@ async fn kinesis_put_records_with_partition_key() { let mut output_lines = records .into_iter() - .map(|e| { + .inspect(|e| { assert_eq!(partition_value, e.partition_key()); - e }) .map(|e| String::from_utf8(e.data.into_inner()).unwrap()) .collect::>(); diff --git a/src/sinks/aws_s_s/sink.rs b/src/sinks/aws_s_s/sink.rs index d933dcd752945..730d42abf9e91 100644 --- a/src/sinks/aws_s_s/sink.rs +++ b/src/sinks/aws_s_s/sink.rs @@ -2,15 +2,6 @@ use super::{client::Client, request_builder::SSRequestBuilder, service::SSServic use crate::sinks::aws_s_s::retry::SSRetryLogic; use crate::sinks::prelude::*; -#[derive(Clone, Copy, Debug, Default)] -pub(crate) struct SqsSinkDefaultBatchSettings; - -impl SinkBatchSettings for SqsSinkDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1); - const MAX_BYTES: Option = Some(262_144); - const TIMEOUT_SECS: f64 = 1.0; -} - #[derive(Clone)] pub(super) struct SSSink where diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index eed5a068c48fa..d78e39fc31837 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -17,7 +17,7 @@ pub struct AzureBlobService { } impl AzureBlobService { - pub fn new(client: Arc) -> AzureBlobService { + pub const fn new(client: Arc) -> AzureBlobService { AzureBlobService { client } } } diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 6dc4049063f58..6e2ca1df8f3c3 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -39,7 +39,7 @@ pub struct LogSinkBuilder { } impl LogSinkBuilder { - pub fn new( + pub const fn new( transformer: Transformer, service: S, default_api_key: Arc, diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 004a90b4b74d3..306d67ad17fcf 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -71,7 +71,7 @@ pub struct DatadogTracesRequestBuilder { } impl DatadogTracesRequestBuilder { - pub fn new( + pub const fn new( api_key: Arc, endpoint_configuration: DatadogTracesEndpointConfiguration, compression: Compression, diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 5973e9689d0ea..122d38f19ac4c 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -125,12 +125,12 @@ impl Encoder> for ElasticsearchEncoder { )?; written_bytes += as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| { - writer.write_all(&[b'\n'])?; + writer.write_all(b"\n")?; // False positive clippy hit on the following line. Clippy wants us to skip the // borrow, but then the value is moved for the following line. #[allow(clippy::needless_borrows_for_generic_args)] serde_json::to_writer(&mut writer, log)?; - writer.write_all(&[b'\n'])?; + writer.write_all(b"\n")?; Ok(()) })?; } diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 6f32821212a78..060720e1545c0 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -199,7 +199,7 @@ async fn structures_events_correctly() { flush(common).await.unwrap(); let response = reqwest::Client::new() - .get(&format!("{}/{}/_search", base_url, index)) + .get(format!("{}/{}/_search", base_url, index)) .json(&json!({ "query": { "query_string": { "query": "*" } } })) @@ -669,7 +669,7 @@ async fn run_insert_tests_with_config( let client = create_http_client(); let mut response = client - .get(&format!("{}/{}/_search", base_url, index)) + .get(format!("{}/{}/_search", base_url, index)) .basic_auth("elastic", Some("vector")) .json(&json!({ "query": { "query_string": { "query": "*" } } @@ -758,7 +758,7 @@ async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) let mut total = 0; for base_url in base_urls { if let Ok(response) = client - .get(&format!("{}/{}/_search", base_url, index)) + .get(format!("{}/{}/_search", base_url, index)) .basic_auth("elastic", Some("vector")) .json(&json!({ "query": { "query_string": { "query": "*" } } diff --git a/src/sinks/file/bytes_path.rs b/src/sinks/file/bytes_path.rs index 7e48b390bc372..238d6884e2bf4 100644 --- a/src/sinks/file/bytes_path.rs +++ b/src/sinks/file/bytes_path.rs @@ -14,7 +14,7 @@ pub struct BytesPath { impl BytesPath { #[cfg(unix)] - pub fn new(path: Bytes) -> Self { + pub const fn new(path: Bytes) -> Self { Self { path } } #[cfg(windows)] diff --git a/src/sinks/greptimedb/logs/integration_tests.rs b/src/sinks/greptimedb/logs/integration_tests.rs index 65b0c9c984954..1a184fa8a9aa3 100644 --- a/src/sinks/greptimedb/logs/integration_tests.rs +++ b/src/sinks/greptimedb/logs/integration_tests.rs @@ -74,7 +74,7 @@ impl GreptimeClient { async fn create_pipeline(&self, pipeline_name: &str, pipeline_content: &str) { self.client - .post(&format!( + .post(format!( "{}/v1/events/pipelines/{}", self.endpoint, pipeline_name )) @@ -87,7 +87,7 @@ impl GreptimeClient { async fn query(&self, sql: &str) -> String { self.client - .get(&format!("{}/v1/sql", self.endpoint)) + .get(format!("{}/v1/sql", self.endpoint)) .query(&[("sql", sql)]) .send() .await diff --git a/src/sinks/greptimedb/metrics/integration_tests.rs b/src/sinks/greptimedb/metrics/integration_tests.rs index 640dc53ac2de3..f6bb782b2a820 100644 --- a/src/sinks/greptimedb/metrics/integration_tests.rs +++ b/src/sinks/greptimedb/metrics/integration_tests.rs @@ -29,7 +29,7 @@ async fn test_greptimedb_sink() { // Drop the table and data inside let _ = query_client - .get(&format!( + .get(format!( "{}/v1/sql", std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) )) @@ -45,7 +45,7 @@ async fn test_greptimedb_sink() { run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; let query_response = query_client - .get(&format!( + .get(format!( "{}/v1/sql", std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) )) @@ -85,7 +85,7 @@ new_naming = true // Drop the table and data inside let _ = query_client - .get(&format!( + .get(format!( "{}/v1/sql", std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) )) @@ -101,7 +101,7 @@ new_naming = true run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; let query_response = query_client - .get(&format!( + .get(format!( "{}/v1/sql", std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned()) )) diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index 12dacd5d23d27..7c0cb5608b7a2 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -471,7 +471,7 @@ pub mod test_util { pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response { client() - .get(&format!("{}/query", endpoint)) + .get(format!("{}/query", endpoint)) .query(&[("q", query)]) .send() .await diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 7a9649b2cedc1..0656036785b9a 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -45,7 +45,7 @@ pub(super) struct MetricData { } impl MetricsApiModel { - pub(super) fn new(metrics: Vec) -> Self { + pub(super) const fn new(metrics: Vec) -> Self { Self([MetricDataStore { metrics }]) } } @@ -148,7 +148,7 @@ impl TryFrom> for MetricsApiModel { pub(super) struct EventsApiModel(pub Vec); impl EventsApiModel { - pub(super) fn new(events_array: Vec) -> Self { + pub(super) const fn new(events_array: Vec) -> Self { Self(events_array) } } @@ -265,7 +265,7 @@ pub(super) enum Timestamp { } impl LogsApiModel { - pub(super) fn new(logs: Vec) -> Self { + pub(super) const fn new(logs: Vec) -> Self { Self([LogDataStore { logs }]) } } diff --git a/src/sinks/prometheus/collector.rs b/src/sinks/prometheus/collector.rs index ccefcb653475d..2024e962b11db 100644 --- a/src/sinks/prometheus/collector.rs +++ b/src/sinks/prometheus/collector.rs @@ -301,7 +301,7 @@ impl StringCollector { let mut result = String::with_capacity(key.len() + value.len() + 3); result.push_str(key); result.push_str("=\""); - while let Some(i) = value.find(|ch| ch == '\\' || ch == '"') { + while let Some(i) = value.find(['\\', '"']) { result.push_str(&value[..i]); result.push('\\'); // Ugly but works because we know the character at `i` is ASCII diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 154ca256e0a08..b8bdf84fb0867 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -105,12 +105,11 @@ pub fn write_all( n_events_pending: usize, buf: &[u8], ) -> io::Result<()> { - writer.write_all(buf).map_err(|error| { + writer.write_all(buf).inspect_err(|error| { emit!(EncoderWriteError { - error: &error, + error, count: n_events_pending, }); - error }) } diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 0904a67cb0468..30306335188ef 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -401,9 +401,8 @@ where let http_client = self.inner.clone(); Box::pin(async move { - let request = request_builder(body).await.map_err(|error| { - emit!(SinkRequestBuildError { error: &error }); - error + let request = request_builder(body).await.inspect_err(|error| { + emit!(SinkRequestBuildError { error }); })?; let byte_size = request.body().len(); let request = request.map(Body::from); @@ -618,7 +617,7 @@ pub struct HttpRequest { impl HttpRequest { /// Creates a new `HttpRequest`. - pub fn new( + pub const fn new( payload: Bytes, finalizers: EventFinalizers, request_metadata: RequestMetadata, diff --git a/src/sinks/util/snappy.rs b/src/sinks/util/snappy.rs index a46edd41ebf97..8ab59cdda23dc 100644 --- a/src/sinks/util/snappy.rs +++ b/src/sinks/util/snappy.rs @@ -19,7 +19,7 @@ pub struct SnappyEncoder { } impl SnappyEncoder { - pub fn new(writer: W) -> Self { + pub const fn new(writer: W) -> Self { Self { writer, buffer: Vec::new(), diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 46706328c0b1f..2982e6696e1e2 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -481,11 +481,10 @@ impl Inner { let mut unsent_event_count = UnsentEventCount::new(events.len()); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { let count = events.len(); - self.send(events).await.map_err(|err| { + self.send(events).await.inspect_err(|_| { // The unsent event count is discarded here because the caller emits the // `StreamClosedError`. unsent_event_count.discard(); - err })?; unsent_event_count.decr(count); } diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 9ee3475c8d2cb..e14522a6032a9 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -361,15 +361,13 @@ impl IngestorProcess { async fn run_once(&mut self) { let messages = self.receive_messages().await; let messages = messages - .map(|messages| { + .inspect(|messages| { emit!(SqsMessageReceiveSucceeded { count: messages.len(), }); - messages }) - .map_err(|err| { - emit!(SqsMessageReceiveError { error: &err }); - err + .inspect_err(|err| { + emit!(SqsMessageReceiveError { error: err }); }) .unwrap_or_default(); @@ -556,9 +554,8 @@ impl IngestorProcess { let lines: Box + Send + Unpin> = Box::new( FramedRead::new(object_reader, self.state.decoder.framer.clone()) .map(|res| { - res.map(|bytes| { + res.inspect(|bytes| { bytes_received.emit(ByteSize(bytes.len())); - bytes }) .map_err(|err| { read_error = Some(err); diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index a69533cc2e48c..f328c88615956 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -256,7 +256,7 @@ async fn source( async fn send_with_path(address: SocketAddr, body: &str, headers: HeaderMap, path: &str) -> u16 { reqwest::Client::new() - .post(&format!("http://{}{}", address, path)) + .post(format!("http://{}{}", address, path)) .headers(headers) .body(body.to_owned()) .send() diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index b229d74786f0d..7372134e4730a 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -473,13 +473,12 @@ impl Decoder for FluentDecoder { src.advance(byte_size); - let maybe_item = self.handle_message(res, byte_size).map_err(|error| { + let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| { let base64_encoded_message = BASE64_STANDARD.encode(&src[..]); emit!(FluentMessageDecodeError { - error: &error, + error, base64_encoded_message }); - error })?; if let Some(item) = maybe_item { return Ok(Some(item)); @@ -1137,7 +1136,7 @@ mod integration_tests { .run(async move { wait_for_tcp(test_address).await; reqwest::Client::new() - .post(&format!("http://{}/", test_address)) + .post(format!("http://{}/", test_address)) .header("content-type", "application/json") .body(body.to_string()) .send() @@ -1218,7 +1217,7 @@ mod integration_tests { .run(async move { wait_for_tcp(test_address).await; reqwest::Client::new() - .post(&format!("http://{}/", test_address)) + .post(format!("http://{}/", test_address)) .header("content-type", "application/json") .body(body.to_string()) .send() diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 3cb202199f3cd..814e0fbb26bd3 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -1109,7 +1109,7 @@ mod integration_tests { let response = self.client.send(request).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); - serde_json::from_str(&String::from_utf8(body.to_vec()).unwrap()).unwrap() + serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap() } async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) { diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 3462cfacc2e43..ed649e2b826b9 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -656,7 +656,7 @@ mod tests { async fn send(address: SocketAddr, body: &str) -> u16 { reqwest::Client::new() - .post(&format!("http://{}/", address)) + .post(format!("http://{}/", address)) .body(body.to_owned()) .send() .await @@ -667,7 +667,7 @@ mod tests { async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 { reqwest::Client::new() - .post(&format!("http://{}/", address)) + .post(format!("http://{}/", address)) .headers(headers) .body(body.to_owned()) .send() @@ -679,7 +679,7 @@ mod tests { async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 { reqwest::Client::new() - .post(&format!("http://{}?{}", address, query)) + .post(format!("http://{}?{}", address, query)) .body(body.to_owned()) .send() .await @@ -690,7 +690,7 @@ mod tests { async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 { reqwest::Client::new() - .post(&format!("http://{}{}", address, path)) + .post(format!("http://{}{}", address, path)) .body(body.to_owned()) .send() .await @@ -701,9 +701,8 @@ mod tests { async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 { let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap(); - format!("method: {}", method.as_str()); reqwest::Client::new() - .request(method, &format!("http://{}{}", address, path)) + .request(method, format!("http://{address}{path}")) .body(body.to_owned()) .send() .await @@ -714,7 +713,7 @@ mod tests { async fn send_bytes(address: SocketAddr, body: Vec, headers: HeaderMap) -> u16 { reqwest::Client::new() - .post(&format!("http://{}/", address)) + .post(format!("http://{address}/")) .headers(headers) .body(body) .send() diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index c1cdc13c7f008..78290f5638961 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -22,7 +22,7 @@ pub struct K8sPathsProvider { impl K8sPathsProvider { /// Create a new [`K8sPathsProvider`]. - pub fn new( + pub const fn new( pod_state: Store, namespace_state: Store, include_paths: Vec, diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 59a2cce7f6eb3..d9716c8596ea5 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1863,7 +1863,7 @@ mod tests { let (_source, address) = source(None).await; let res = reqwest::Client::new() - .get(&format!("http://{}/services/collector/health", address)) + .get(format!("http://{}/services/collector/health", address)) .header("Authorization", format!("Splunk {}", "invalid token")) .send() .await @@ -1877,7 +1877,7 @@ mod tests { let (_source, address) = source(None).await; let res = reqwest::Client::new() - .get(&format!("http://{}/services/collector/health", address)) + .get(format!("http://{}/services/collector/health", address)) .send() .await .unwrap(); diff --git a/src/sources/util/unix_datagram.rs b/src/sources/util/unix_datagram.rs index 7800b760090f5..20be4c474c2e0 100644 --- a/src/sources/util/unix_datagram.rs +++ b/src/sources/util/unix_datagram.rs @@ -81,9 +81,8 @@ async fn listen( let span = info_span!("datagram"); let received_from = if !address.is_unnamed() { - let path = address.as_pathname().map(|e| e.to_owned()).map(|path| { - span.record("peer_path", field::debug(&path)); - path + let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| { + span.record("peer_path", field::debug(path)); }); path.map(|p| p.to_string_lossy().into_owned().into()) diff --git a/src/top/state.rs b/src/top/state.rs index 09026bbde13f8..b6db5cef2bc67 100644 --- a/src/top/state.rs +++ b/src/top/state.rs @@ -80,7 +80,7 @@ pub struct State { } impl State { - pub fn new(components: BTreeMap) -> Self { + pub const fn new(components: BTreeMap) -> Self { Self { connection_status: ConnectionStatus::Pending, components, diff --git a/src/topology/test/end_to_end.rs b/src/topology/test/end_to_end.rs index 91b37adc02b34..9434af5003df0 100644 --- a/src/topology/test/end_to_end.rs +++ b/src/topology/test/end_to_end.rs @@ -66,7 +66,7 @@ pub fn http_client( let sender = tokio::spawn(async move { let result = reqwest::Client::new() - .post(&format!("http://{}/", address)) + .post(format!("http://{}/", address)) .body(body) .send() .await diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index ff004aa9c515b..9b599f5fe704e 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -156,7 +156,7 @@ impl Lua { globals.raw_set("event", LuaEvent { inner: event })?; let func = lua.registry_value::>(&self.vector_func)?; - func.call(())?; + func.call::<(), ()>(())?; let result = globals .raw_get::<_, Option>("event") diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index 450f2b2fe0186..72e9558d3f5bd 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -260,7 +260,7 @@ impl Lua { } if let Some(source) = &config.source { - lua.load(source).eval().context(InvalidSourceSnafu)?; + lua.load(source).eval::<()>().context(InvalidSourceSnafu)?; } let hook_init_code = config.hooks.init.as_ref(); diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index b58d5db94bc98..1aeef9eab4d74 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -158,7 +158,7 @@ struct ConcatArrayMerger { } impl ConcatArrayMerger { - fn new(v: Vec) -> Self { + const fn new(v: Vec) -> Self { Self { v } } } @@ -216,7 +216,7 @@ struct LongestArrayMerger { } impl LongestArrayMerger { - fn new(v: Vec) -> Self { + const fn new(v: Vec) -> Self { Self { v } } } @@ -252,7 +252,7 @@ struct ShortestArrayMerger { } impl ShortestArrayMerger { - fn new(v: Vec) -> Self { + const fn new(v: Vec) -> Self { Self { v } } } diff --git a/vdev/src/commands/check/licenses.rs b/vdev/src/commands/check/licenses.rs index fd68a4c490c27..9d1a246905e13 100644 --- a/vdev/src/commands/check/licenses.rs +++ b/vdev/src/commands/check/licenses.rs @@ -9,9 +9,8 @@ pub struct Cli {} impl Cli { pub fn exec(self) -> Result<()> { - app::exec("dd-rust-license-tool", ["check"], true).map_err(|err| { + app::exec("dd-rust-license-tool", ["check"], true).inspect_err(|_| { info!("Run `cargo vdev build licenses` to regenerate the file"); - err }) } }