Skip to content

Commit 6c9c1f2

Browse files
authored
Merge pull request #246 from yra1029/Add_producer_timestamp
feat: Added the support for the new format of producer message with timestamp support
2 parents 16d964a + a8889fc commit 6c9c1f2

File tree

4 files changed

+216
-2
lines changed

4 files changed

+216
-2
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ flate2 = { version = "1.0", optional = true }
2222
openssl = { version = "0.10", optional = true }
2323
openssl-sys = { version = "0.9", optional = true }
2424
snap = { version = "1.1", optional = true }
25+
chrono = {version = "0.4", optional = true}
2526
thiserror = "1.0"
2627
tracing = "0.1"
2728

@@ -38,5 +39,6 @@ default = ["snappy", "gzip", "security"]
3839
snappy = ["snap"]
3940
gzip = ["flate2"]
4041
security = ["openssl", "openssl-sys"]
42+
producer_timestamp = ["chrono"]
4143
nightly = []
4244
integration_tests = []

src/client/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use crate::protocol::list_offset::ListOffsetVersion;
1919
pub use crate::utils::PartitionOffset;
2020
use crate::utils::TimestampedPartitionOffset;
2121

22+
#[cfg(feature = "producer_timestamp")]
23+
pub use crate::protocol::produce::ProducerTimestamp;
24+
25+
#[cfg(not(feature = "producer_timestamp"))]
26+
use crate::protocol::produce::ProducerTimestamp;
27+
2228
#[cfg(feature = "security")]
2329
pub use self::network::SecurityConfig;
2430

@@ -78,6 +84,9 @@ pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME
7884
/// The default value for `KafkaClient::set_connection_idle_timeout(..)`
7985
pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS: u64 = 540_000;
8086

87+
/// The default value for `KafkaClient::set_producer_timestamp(..)`
88+
pub(crate) const DEFAULT_PRODUCER_TIMESTAMP: Option<ProducerTimestamp> = None;
89+
8190
/// Client struct keeping track of brokers and topic metadata.
8291
///
8392
/// Implements methods described by the [Kafka Protocol](http://kafka.apache.org/protocol.html).
@@ -122,6 +131,9 @@ struct ClientConfig {
122131
// ~ the number of repeated retry attempts; prevents endless
123132
// repetition of a retry attempt
124133
retry_max_attempts: u32,
134+
// ~ producer's message timestamp option CreateTime/LogAppendTime
135+
#[allow(unused)]
136+
producer_timestamp: Option<ProducerTimestamp>,
125137
}
126138

127139
// --------------------------------------------------------------------
@@ -408,6 +420,7 @@ impl KafkaClient {
408420
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
409421
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
410422
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
423+
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
411424
},
412425
conn_pool: network::Connections::new(
413426
default_conn_rw_timeout(),
@@ -477,6 +490,7 @@ impl KafkaClient {
477490
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
478491
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
479492
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
493+
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
480494
},
481495
conn_pool: network::Connections::new_with_security(
482496
default_conn_rw_timeout(),
@@ -722,6 +736,31 @@ impl KafkaClient {
722736
self.conn_pool.idle_timeout()
723737
}
724738

739+
#[cfg(feature = "producer_timestamp")]
740+
/// Sets the compression algorithm to use when sending out messages.
741+
///
742+
/// # Example
743+
///
744+
/// ```no_run
745+
/// use kafka::client::{Compression, KafkaClient};
746+
///
747+
/// let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
748+
/// client.load_metadata_all().unwrap();
749+
/// client.set_producer_timestamp(Timestamp::CreateTime);
750+
/// ```
751+
#[inline]
752+
pub fn set_producer_timestamp(&mut self, producer_timestamp: Option<ProducerTimestamp>) {
753+
self.config.producer_timestamp = producer_timestamp;
754+
}
755+
756+
#[cfg(feature = "producer_timestamp")]
757+
/// Retrieves the current `KafkaClient::producer_timestamp` setting.
758+
#[inline]
759+
#[must_use]
760+
pub fn producer_timestamp(&self) -> Option<ProducerTimestamp> {
761+
self.config.producer_timestamp
762+
}
763+
725764
/// Provides a view onto the currently loaded metadata of known .
726765
///
727766
/// # Examples
@@ -1455,6 +1494,8 @@ impl KafkaClientInternals for KafkaClient {
14551494
correlation,
14561495
&config.client_id,
14571496
config.compression,
1497+
#[cfg(feature = "producer_timestamp")]
1498+
config.producer_timestamp,
14581499
)
14591500
})
14601501
.add(msg.topic, msg.partition, msg.key, msg.value),

src/producer.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ use std::slice::from_ref;
6868
use std::time::Duration;
6969
use twox_hash::XxHash32;
7070

71+
#[cfg(feature = "producer_timestamp")]
72+
use crate::protocol::produce::ProducerTimestamp;
73+
7174
#[cfg(feature = "security")]
7275
use crate::client::SecurityConfig;
7376

@@ -360,6 +363,8 @@ pub struct Builder<P = DefaultPartitioner> {
360363
partitioner: P,
361364
security_config: Option<SecurityConfig>,
362365
client_id: Option<String>,
366+
#[cfg(feature = "producer_timestamp")]
367+
producer_timestamp: Option<ProducerTimestamp>,
363368
}
364369

365370
impl Builder {
@@ -376,6 +381,8 @@ impl Builder {
376381
partitioner: DefaultPartitioner::default(),
377382
security_config: None,
378383
client_id: None,
384+
#[cfg(feature = "producer_timestamp")]
385+
producer_timestamp: None,
379386
};
380387
if let Some(ref c) = b.client {
381388
b.compression = c.compression();
@@ -437,6 +444,16 @@ impl Builder {
437444
self.client_id = Some(client_id);
438445
self
439446
}
447+
448+
#[cfg(feature = "producer_timestamp")]
449+
/// Sets the compression algorithm to use when sending out data.
450+
///
451+
/// See `KafkaClient::set_producer_timestamp`.
452+
#[must_use]
453+
pub fn with_timestamp(mut self, timestamp: ProducerTimestamp) -> Self {
454+
self.producer_timestamp = Some(timestamp);
455+
self
456+
}
440457
}
441458

442459
impl<P> Builder<P> {
@@ -453,6 +470,8 @@ impl<P> Builder<P> {
453470
partitioner,
454471
security_config: None,
455472
client_id: None,
473+
#[cfg(feature = "producer_timestamp")]
474+
producer_timestamp: None,
456475
}
457476
}
458477

@@ -484,6 +503,8 @@ impl<P> Builder<P> {
484503
// ~ apply configuration settings
485504
client.set_compression(self.compression);
486505
client.set_connection_idle_timeout(self.conn_idle_timeout);
506+
#[cfg(feature = "producer_timestamp")]
507+
client.set_producer_timestamp(self.producer_timestamp);
487508
if let Some(client_id) = self.client_id {
488509
client.set_client_id(client_id);
489510
}

0 commit comments

Comments
 (0)