Skip to content

Commit ca00938

Browse files
committed
Migrate to kafka 2.1
1 parent 8f5230b commit ca00938

File tree

17 files changed

+41
-46
lines changed

17 files changed

+41
-46
lines changed

core/src/main/java/io/quicksign/kafka/crypto/CryptoDeserializer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.kafka.common.header.Headers;
3030
import org.apache.kafka.common.header.internals.RecordHeaders;
3131
import org.apache.kafka.common.serialization.Deserializer;
32-
import org.apache.kafka.common.serialization.ExtendedDeserializer;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

@@ -56,18 +55,18 @@
5655
* @see CryptoSerializer
5756
* @see Decryptor
5857
*/
59-
public class CryptoDeserializer<T> implements ExtendedDeserializer<T> {
58+
public class CryptoDeserializer<T> implements Deserializer<T> {
6059

6160
private static final Logger log = LoggerFactory.getLogger(CryptoDeserializer.class);
6261

63-
private final ExtendedDeserializer<? extends T> rawDeserializer;
62+
private final Deserializer<? extends T> rawDeserializer;
6463
private final Decryptor decryptor;
6564

6665
/**
6766
* @param rawDeserializer deserializer to deserialize clear data
6867
* @param decryptor Decryptor used to decrypt the data
6968
*/
70-
public CryptoDeserializer(ExtendedDeserializer<? extends T> rawDeserializer, Decryptor decryptor) {
69+
public CryptoDeserializer(Deserializer<? extends T> rawDeserializer, Decryptor decryptor) {
7170

7271
this.rawDeserializer = rawDeserializer;
7372
this.decryptor = decryptor;

core/src/main/java/io/quicksign/kafka/crypto/CryptoDeserializerFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package io.quicksign.kafka.crypto;
2121

2222
import org.apache.kafka.common.serialization.Deserializer;
23-
import org.apache.kafka.common.serialization.ExtendedDeserializer;
2423

2524
public class CryptoDeserializerFactory {
2625

@@ -32,6 +31,6 @@ public CryptoDeserializerFactory(Decryptor decryptor) {
3231
}
3332

3433
public <T> CryptoDeserializer<T> buildFrom(Deserializer<T> rawDeserializer) {
35-
return new CryptoDeserializer<>(ExtendedDeserializer.Wrapper.ensureExtended(rawDeserializer), decryptor);
34+
return new CryptoDeserializer<>(rawDeserializer, decryptor);
3635
}
3736
}

core/src/main/java/io/quicksign/kafka/crypto/CryptoSerializer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import org.apache.kafka.common.header.Header;
3030
import org.apache.kafka.common.header.Headers;
31-
import org.apache.kafka.common.serialization.ExtendedSerializer;
3231
import org.apache.kafka.common.serialization.Serializer;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
@@ -57,12 +56,12 @@
5756
*
5857
* @param <T>
5958
*/
60-
public class CryptoSerializer<T> implements ExtendedSerializer<T> {
59+
public class CryptoSerializer<T> implements Serializer<T> {
6160

6261
private static final Logger log = LoggerFactory.getLogger(CryptoSerializer.class);
6362

6463

65-
private final ExtendedSerializer<? super T> rawSerializer;
64+
private final Serializer<? super T> rawSerializer;
6665
private final Encryptor encryptor;
6766
private final ThreadLocal<byte[]> keyRefHolder;
6867

@@ -71,7 +70,7 @@ public class CryptoSerializer<T> implements ExtendedSerializer<T> {
7170
* @param encryptor {@link Encryptor} to encrypt data
7271
* @param keyRefHolder {@link ThreadLocal} used to communicate the key reference when using Kafka Stream (unused for regular Kafka Producer)
7372
*/
74-
public CryptoSerializer(ExtendedSerializer<? super T> rawSerializer, Encryptor encryptor, ThreadLocal<byte[]> keyRefHolder) {
73+
public CryptoSerializer(Serializer<? super T> rawSerializer, Encryptor encryptor, ThreadLocal<byte[]> keyRefHolder) {
7574
this.rawSerializer = rawSerializer;
7675
this.encryptor = encryptor;
7776
this.keyRefHolder = keyRefHolder;

core/src/main/java/io/quicksign/kafka/crypto/CryptoSerializerFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
*/
2020
package io.quicksign.kafka.crypto;
2121

22-
import org.apache.kafka.common.serialization.ExtendedSerializer;
2322
import org.apache.kafka.common.serialization.Serializer;
2423

2524
public class CryptoSerializerFactory {
@@ -32,6 +31,6 @@ public CryptoSerializerFactory(Encryptor encryptor) {
3231
}
3332

3433
public <T> CryptoSerializer<T> buildFrom(Serializer<T> rawSerializer) {
35-
return new CryptoSerializer<>(ExtendedSerializer.Wrapper.ensureExtended(rawSerializer), encryptor, null);
34+
return new CryptoSerializer<>(rawSerializer, encryptor, null);
3635
}
3736
}

core/src/main/java/io/quicksign/kafka/crypto/pairing/internal/CryptoAwareSerializerWrapper.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Map;
2323

2424
import org.apache.kafka.common.header.Headers;
25-
import org.apache.kafka.common.serialization.ExtendedSerializer;
2625
import org.apache.kafka.common.serialization.Serializer;
2726

2827
import io.quicksign.kafka.crypto.KafkaCryptoConstants;
@@ -34,9 +33,9 @@
3433
*
3534
* @param <T>
3635
*/
37-
public class CryptoAwareSerializerWrapper<T> implements ExtendedSerializer<T> {
36+
public class CryptoAwareSerializerWrapper<T> implements Serializer<T> {
3837

39-
private final ExtendedSerializer<T> rawSerializer;
38+
private final Serializer<T> rawSerializer;
4039
private final KeyReferenceExtractor keyReferenceExtractor;
4140
private final ThreadLocal<byte[]> keyRefHolder;
4241

@@ -46,7 +45,7 @@ public class CryptoAwareSerializerWrapper<T> implements ExtendedSerializer<T> {
4645
* @param keyRefHolder the ThreadLocal to share the keyref (only used in the context of a Kafka Stream)
4746
*/
4847
public CryptoAwareSerializerWrapper(Serializer<T> rawSerializer, KeyReferenceExtractor keyReferenceExtractor, ThreadLocal<byte[]> keyRefHolder) {
49-
this.rawSerializer = ExtendedSerializer.Wrapper.ensureExtended(rawSerializer);
48+
this.rawSerializer = rawSerializer;
5049
this.keyReferenceExtractor = keyReferenceExtractor;
5150
this.keyRefHolder = keyRefHolder;
5251
}

core/src/main/java/io/quicksign/kafka/crypto/pairing/package-info.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* and put in the context the cryptographic key reference, that will be used to encrypt the record value.</p>
3030
*
3131
* <p>This based on the fact that {@link org.apache.kafka.clients.producer.KafkaProducer#doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) KafkaProducer}
32-
* and {@link org.apache.kafka.streams.processor.internals.RecordCollectorImpl#send(java.lang.String, java.lang.Object, java.lang.Object, java.lang.Integer, java.lang.Long, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer) RecordCollector}
32+
* and {@link org.apache.kafka.streams.processor.internals.RecordCollectorImpl#send(java.lang.String, java.lang.Object, java.lang.Object, org.apache.kafka.common.header.Headers, java.lang.Integer, java.lang.Long, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer) RecordCollector}
3333
* (for streams)
3434
* call key serialization before value serialization.
3535
* </p>

core/src/main/java/io/quicksign/kafka/crypto/pairing/serdes/CryptoSerdeFactory.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
*/
2020
package io.quicksign.kafka.crypto.pairing.serdes;
2121

22-
import org.apache.kafka.common.serialization.ExtendedDeserializer;
23-
import org.apache.kafka.common.serialization.ExtendedSerializer;
2422
import org.apache.kafka.common.serialization.Serde;
2523
import org.apache.kafka.common.serialization.Serdes;
2624

@@ -65,11 +63,8 @@ public <T> Serde<T> buildFrom(Serde<T> rawSerde) {
6563
}
6664

6765
private <T> Serde<T> buildFrom(Serde<T> rawSerde, ThreadLocal<byte[]> keyRefHolder) {
68-
ExtendedDeserializer<T> rawExtendedDeserializer = ExtendedDeserializer.Wrapper.ensureExtended(rawSerde.deserializer());
69-
ExtendedSerializer<T> rawExtendedSerializer = ExtendedSerializer.Wrapper.ensureExtended(rawSerde.serializer());
70-
71-
return Serdes.serdeFrom(new CryptoSerializer<>(rawExtendedSerializer, encryptor, keyRefHolder),
72-
new CryptoDeserializer<>(rawExtendedDeserializer, decryptor));
66+
return Serdes.serdeFrom(new CryptoSerializer<>(rawSerde.serializer(), encryptor, keyRefHolder),
67+
new CryptoDeserializer<>(rawSerde.deserializer(), decryptor));
7368
}
7469

7570
/**

core/src/main/java/io/quicksign/kafka/crypto/pairing/serdes/SerdesPair.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
package io.quicksign.kafka.crypto.pairing.serdes;
2121

2222
import org.apache.kafka.common.serialization.Serde;
23-
import org.apache.kafka.streams.Consumed;
23+
import org.apache.kafka.streams.kstream.Consumed;
24+
import org.apache.kafka.streams.kstream.Grouped;
2425
import org.apache.kafka.streams.kstream.Materialized;
2526
import org.apache.kafka.streams.kstream.Produced;
2627
import org.apache.kafka.streams.kstream.Serialized;
@@ -60,6 +61,13 @@ public Serialized<K, V> toSerialized() {
6061
return Serialized.with(keySerde, valueSerde);
6162
}
6263

64+
/**
65+
* Build a {@link Grouped} using the keySerde and valueSerde of the pair
66+
*/
67+
public Grouped<K, V> toGrouped() {
68+
return Grouped.with(keySerde, valueSerde);
69+
}
70+
6371
/**
6472
* Build a {@link Produced} using the keySerde and valueSerde of the pair
6573
*

core/src/main/java/io/quicksign/kafka/crypto/pairing/serializer/CryptoSerializerPairFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
*/
2020
package io.quicksign.kafka.crypto.pairing.serializer;
2121

22-
import org.apache.kafka.common.serialization.ExtendedSerializer;
2322
import org.apache.kafka.common.serialization.Serializer;
2423

2524
import io.quicksign.kafka.crypto.CryptoSerializer;
@@ -51,7 +50,7 @@ public CryptoSerializerPairFactory(Encryptor encryptor, KeyReferenceExtractor ke
5150
@Override
5251
public <K, V> SerializerPair<K, V> build(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
5352
Serializer<K> newKeySerializer = new CryptoAwareSerializerWrapper<K>(keySerializer, keyReferenceExtractor, null);
54-
Serializer<V> newvalueSerializer = new CryptoSerializer<>(ExtendedSerializer.Wrapper.ensureExtended(valueSerializer), encryptor, null);
53+
Serializer<V> newvalueSerializer = new CryptoSerializer<>(valueSerializer, encryptor, null);
5554
return new SerializerPair<>(newKeySerializer, newvalueSerializer);
5655
}
5756
}

core/src/test/java/io/quicksign/kafka/crypto/CryptoDeserializerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
import org.apache.kafka.common.header.Headers;
3030
import org.apache.kafka.common.header.internals.RecordHeaders;
31-
import org.apache.kafka.common.serialization.ExtendedDeserializer;
31+
import org.apache.kafka.common.serialization.Deserializer;
3232
import org.junit.Test;
3333
import org.junit.runner.RunWith;
3434
import org.mockito.InjectMocks;
@@ -42,7 +42,7 @@ public class CryptoDeserializerTest {
4242
Decryptor decryptor;
4343

4444
@Mock
45-
ExtendedDeserializer<String> rawDeserializer;
45+
Deserializer<String> rawDeserializer;
4646

4747
@InjectMocks
4848
CryptoDeserializer<String> cryptoDeserializer;

core/src/test/java/io/quicksign/kafka/crypto/CryptoSerializerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
import org.apache.kafka.common.header.Headers;
3131
import org.apache.kafka.common.header.internals.RecordHeaders;
32-
import org.apache.kafka.common.serialization.ExtendedSerializer;
32+
import org.apache.kafka.common.serialization.Serializer;
3333
import org.junit.Test;
3434
import org.junit.runner.RunWith;
3535
import org.mockito.InjectMocks;
@@ -40,7 +40,7 @@
4040
public class CryptoSerializerTest {
4141

4242
@Mock
43-
ExtendedSerializer<String> rawSerializer;
43+
Serializer<String> rawSerializer;
4444

4545
@Mock
4646
Encryptor encryptor;

doc/running-docker-kafka.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ You need to have a running Kafka in order to run the examples, here's how you ca
66
77
[source,indent=0]
88
....
9-
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:cp4.0.0
9+
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:2.0.1
1010
....
1111
1212
*On linux*
1313
1414
[source,indent=0]
1515
....
16-
docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:cp4.0.0
16+
docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:2.0.1
1717
....
1818
====

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<maven.compiler.target>1.8</maven.compiler.target>
5454
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
5555

56-
<kafka.version>1.1.0</kafka.version>
56+
<kafka.version>2.1.0</kafka.version>
5757
</properties>
5858

5959
<distributionManagement>

samples/generatedkey-sample/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ version: '3'
1414

1515
services:
1616
kafka:
17-
image: landoop/fast-data-dev:cp4.0.0
17+
image: landoop/fast-data-dev:2.0.1
1818
network_mode: "host"

samples/kafkastream-with-keyrepo-sample/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ version: '3'
1414

1515
services:
1616
kafka:
17-
image: landoop/fast-data-dev:cp4.0.0
17+
image: landoop/fast-data-dev:2.0.1
1818
network_mode: "host"

samples/kafkastream-with-keyrepo-sample/src/main/java/io/quicksign/kafka/crypto/samples/stream/keyrepo/SampleStream.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
*/
2020
package io.quicksign.kafka.crypto.samples.stream.keyrepo;
2121

22-
import java.util.HashMap;
23-
import java.util.Map;
22+
import java.util.Properties;
2423

2524
import org.apache.kafka.common.serialization.Serdes;
2625
import org.apache.kafka.streams.KafkaStreams;
@@ -72,12 +71,11 @@ public void run() {
7271
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(name + "_balance");
7372

7473

75-
Map<String, Object> props = new HashMap<>();
76-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, name);
77-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
78-
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
79-
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
80-
StreamsConfig config = new StreamsConfig(props);
74+
Properties props = new Properties();
75+
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, name);
76+
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
77+
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
78+
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
8179

8280

8381
StreamsBuilder streamsBuilder = new StreamsBuilder();
@@ -90,7 +88,7 @@ public void run() {
9088
.reduce((s1, s2) -> "" + (Integer.valueOf(s1) + Integer.valueOf(s2)),
9189
serdesPair.applyTo(Materialized.as(storeSupplier)));
9290

93-
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), config);
91+
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
9492
kafkaStreams.start();
9593

9694
// end::stream[]

samples/keyrepo-sample/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ version: '3'
1414

1515
services:
1616
kafka:
17-
image: landoop/fast-data-dev:cp4.0.0
17+
image: landoop/fast-data-dev:2.0.1
1818
network_mode: "host"

0 commit comments

Comments
 (0)