From 9dbe4b6e8409418420fa7404f664a2ede5d5d6ab Mon Sep 17 00:00:00 2001 From: Kim-Dong-Jun99 Date: Sun, 7 Jan 2024 19:38:08 +0900 Subject: [PATCH] =?UTF-8?q?Refactor=20:=20=EB=B0=B0=ED=8F=AC=20=EC=84=9C?= =?UTF-8?q?=EB=B2=84=20kafka=EB=A1=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/KafkaConsumerConfig.java | 14 +++++-- .../config/KafkaProducerConfig.java | 12 ++++-- .../tentenstomp/config/KafkaTopicConfig.java | 38 +++++++++++++++++++ 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tenten/tentenstomp/config/KafkaConsumerConfig.java b/src/main/java/org/tenten/tentenstomp/config/KafkaConsumerConfig.java index 5bc947f..e362d0c 100644 --- a/src/main/java/org/tenten/tentenstomp/config/KafkaConsumerConfig.java +++ b/src/main/java/org/tenten/tentenstomp/config/KafkaConsumerConfig.java @@ -2,6 +2,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -13,9 +15,14 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; + @EnableKafka @Configuration public class KafkaConsumerConfig { + @Value("${kafka.consumer}") + private String host; + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -29,9 +36,10 @@ public ConsumerFactory consumerFactory() { deserializer.trustedPackages("*"); // Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정 Map consumerConfigMap = new HashMap<>(); - consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer); - consumerConfigMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerConfigMap.put(BOOTSTRAP_SERVERS_CONFIG, host); + consumerConfigMap.put(KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + consumerConfigMap.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); + consumerConfigMap.put(AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<>(consumerConfigMap, new StringDeserializer(), deserializer); } diff --git a/src/main/java/org/tenten/tentenstomp/config/KafkaProducerConfig.java b/src/main/java/org/tenten/tentenstomp/config/KafkaProducerConfig.java index 6fa59e9..3958a88 100644 --- a/src/main/java/org/tenten/tentenstomp/config/KafkaProducerConfig.java +++ b/src/main/java/org/tenten/tentenstomp/config/KafkaProducerConfig.java @@ -2,6 +2,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -13,9 +14,14 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + @EnableKafka @Configuration public class KafkaProducerConfig { + + @Value("${kafka.producer}") + private String host; @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigurations()); @@ -24,9 +30,9 @@ public ProducerFactory producerFactory() { @Bean public Map producerConfigurations() { HashMap producerConfigMap = new HashMap<>(); - producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + producerConfigMap.put(BOOTSTRAP_SERVERS_CONFIG, host); + producerConfigMap.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfigMap.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return producerConfigMap; } diff --git a/src/main/java/org/tenten/tentenstomp/config/KafkaTopicConfig.java b/src/main/java/org/tenten/tentenstomp/config/KafkaTopicConfig.java index 0e61597..464855d 100644 --- a/src/main/java/org/tenten/tentenstomp/config/KafkaTopicConfig.java +++ b/src/main/java/org/tenten/tentenstomp/config/KafkaTopicConfig.java @@ -1,13 +1,51 @@ package org.tenten.tentenstomp.config; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; +import org.tenten.tentenstomp.global.common.constant.TopicConstant; + +import java.util.HashMap; +import java.util.Map; + +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*; @Configuration public class KafkaTopicConfig { + @Value("${kafka.producer}") + private String host; @Bean public NewTopic newTopic() { return new NewTopic("kafka", 1, (short) 1); } + + @Bean + public NewTopic tripInfo() { + return new NewTopic(TRIP_INFO, 1, (short) 1); + } + + @Bean + public NewTopic tripItem() { + return new NewTopic(TRIP_ITEM, 1, (short) 1); + } + + @Bean + public NewTopic path() { + return new NewTopic(PATH, 1, (short) 1); + } + + @Bean + public NewTopic connectedMember() { + return new NewTopic(MEMBER, 1, (short) 1); + } + + @Bean + public KafkaAdmin kafkaAdmin() { + Map adminConfigMap = new HashMap<>(); + adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host); + return new KafkaAdmin(adminConfigMap); + } }