diff --git a/CHANGES.md b/CHANGES.md index b517839ee3..f5ebc460d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,7 @@ Release Notes. * Fix possible IllegalStateException when using Micrometer. * Support Grizzly Work ThreadPool Metric Monitor * Fix the gson dependency in the kafka-reporter-plugin. +* Fix deserialization of kafka producer json config in the kafka-reporter-plugin. * Support to config custom decode methods for kafka configurations #### Documentation diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index 5bb66e45ca..31dc8482f1 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -18,8 +18,8 @@ package org.apache.skywalking.apm.agent.core.kafka; +import com.google.gson.reflect.TypeToken; import com.google.gson.Gson; - import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashSet; @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; - import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.producer.KafkaProducer; @@ -107,11 +106,7 @@ public void run() { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS); - if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) { - Gson gson = new Gson(); - Map config = (Map) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class); - decode(config).forEach(properties::setProperty); - } + setPropertiesFromJsonConfig(properties); decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty); try (AdminClient adminClient = AdminClient.create(properties)) { @@ -131,12 +126,12 @@ public void run() { }) .filter(Objects::nonNull) .collect(Collectors.toSet()); - + if (!topics.isEmpty()) { LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics); return; } - + try { producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); } catch (Exception e) { @@ -149,6 +144,15 @@ public void run() { } } + void setPropertiesFromJsonConfig(Properties properties) { + if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) { + Gson gson = new Gson(); + Map config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, + new TypeToken>() { }.getType()); + decode(config).forEach(properties::setProperty); + } + } + private void notifyListeners(KafkaConnectionStatus status) { for (KafkaConnectionStatusListener listener : listeners) { listener.onStatusChanged(status); diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java index 4317fb03cc..3a6b7c2d49 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java @@ -18,16 +18,15 @@ package org.apache.skywalking.apm.agent.core.kafka; -import org.junit.Test; - +import static org.junit.Assert.assertEquals; import java.lang.reflect.Method; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; public class KafkaProducerManagerTest { @Test @@ -39,8 +38,8 @@ public void testAddListener() throws Exception { kafkaProducerManager.addListener(new MockListener(counter)); } Method notifyListeners = kafkaProducerManager - .getClass() - .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class); + .getClass() + .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class); notifyListeners.setAccessible(true); notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED); @@ -60,6 +59,17 @@ public void testFormatTopicNameThenRegister() { assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value); } + @Test + public void testSetPropertiesFromJsonConfig() { + KafkaProducerManager kafkaProducerManager = new KafkaProducerManager(); + Properties properties = new Properties(); + + KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}"; + kafkaProducerManager.setPropertiesFromJsonConfig(properties); + + assertEquals(properties.get("batch.size"), "32768"); + } + @Test public void testDecode() throws Exception { KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";