diff --git a/src/main/java/org/tenten/tentenstomp/config/WebSocketConfig.java b/src/main/java/org/tenten/tentenstomp/config/WebSocketConfig.java index 43f8855..e7c595d 100644 --- a/src/main/java/org/tenten/tentenstomp/config/WebSocketConfig.java +++ b/src/main/java/org/tenten/tentenstomp/config/WebSocketConfig.java @@ -1,32 +1,38 @@ package org.tenten.tentenstomp.config; +import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; +import org.tenten.tentenstomp.global.stomp.StompExceptionHandler; +import org.tenten.tentenstomp.global.stomp.StompPreHandler; @Configuration @EnableWebSocket @EnableWebSocketMessageBroker +@RequiredArgsConstructor public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { - + private final StompExceptionHandler stompExceptionHandler; + private final StompPreHandler stompPreHandler; @Override public void registerStompEndpoints(StompEndpointRegistry endpointRegistry) { endpointRegistry.addEndpoint("/ws-stomp")// 소켓 연결 Endpoint 설정 .setAllowedOriginPatterns("http://*:8080", "http://*.*.*.*:8080", "https://jxy.me/", "http://localhost:5173", "https://weplanplans.vercel.app", "https://dev-weplanplans.vercel.app"); -// .withSockJS() -// .setClientLibraryUrl("https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.2/sockjs.js"); // Todo 추후 특정 url 변경 - + endpointRegistry.setErrorHandler(stompExceptionHandler); } @Override public void configureMessageBroker(MessageBrokerRegistry brokerRegistry) { - // 클라이언트가 Server로 메세지 발행 -> @MessageMapping 이 붙어있는 메서드와 연결 brokerRegistry.setApplicationDestinationPrefixes("/pub"); - // 메서드에서 처리된 메세지의 결과를 broker를 통해서 /sub/message 를 구독하고 있는 - // 모든 클라이언트들에게 메세지를 전달 brokerRegistry.enableSimpleBroker("/sub"); } + + @Override + public void configureClientInboundChannel(ChannelRegistration registration) { + registration.interceptors(stompPreHandler); + } } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripController.java b/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripController.java index 443692d..bca585e 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripController.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripController.java @@ -17,57 +17,49 @@ public class TripController { private final TripService tripService; private final KafkaProducer kafkaProducer; + /* + TODO : 백엔드에서 예외가 발생하면, 프론트로 예외 발생하기 전 시점 데이터를 보내줘야하는데, 이걸 어떻게 할 수 있을까 + TODO : 실시간 편집인데, 노션 처럼 누가 어떤 것을 변경했는지 알려줄 필요가 있지 않을까? + */ - @MessageMapping("/kafka") - public void testKafka(@Payload TripUpdateMsg tripUpdateMsg) { - kafkaProducer.send("kafka", tripUpdateMsg); - } @MessageMapping("/trips/{tripId}/connectMember") public void connectMember(@DestinationVariable String tripId, @Payload MemberConnectMsg memberConnectMsg) { - log.info("/trips/"+tripId+"/connectMember"); tripService.connectMember(tripId, memberConnectMsg); } @MessageMapping("/trips/{tripId}/getConnectedMember") public void getConnectedMember(@DestinationVariable String tripId) { - log.info("/trips/"+tripId+"/connectMember"); tripService.getConnectedMember(tripId); } @MessageMapping("/trips/{tripId}/disconnectMember") public void disconnectMember(@DestinationVariable String tripId, @Payload MemberDisconnectMsg memberDisconnectMsg) { - log.info("/trips/"+tripId+"/disconnectMember"); tripService.disconnectMember(tripId, memberDisconnectMsg); } @MessageMapping("/trips/{tripId}/enterMember") public void enterMember(@DestinationVariable String tripId, @Payload MemberConnectMsg memberConnectMsg) { - log.info("/trips/"+tripId+"/enterMember"); tripService.enterMember(tripId, memberConnectMsg); } @MessageMapping("/trips/{tripId}/info") public void editPlan(@DestinationVariable String tripId, @Payload TripUpdateMsg tripUpdateMsg) { - log.info("/trips/"+tripId+"/info"); tripService.updateTrip(tripId, tripUpdateMsg); } @MessageMapping("/trips/{tripId}/addTripItems") public void addTripItem(@DestinationVariable String tripId, @Payload TripItemAddMsg tripItemAddMsg) { - log.info("/trips/"+tripId+"/addTripItems"); tripService.addTripItem(tripId, tripItemAddMsg); } @MessageMapping("/trips/{tripId}/updateTripItemOrder") public void updateTripItemOrder(@DestinationVariable String tripId, @Payload TripItemOrderUpdateMsg orderUpdateMsg) { - log.info("/trips/"+tripId+"/updateTripItemOrder"); tripService.updateTripItemOrder(tripId, orderUpdateMsg); } @MessageMapping("/trips/{tripId}/getPathAndItems") public void getPathAndItems(@DestinationVariable String tripId, @Payload PathAndItemRequestMsg pathAndItemRequestMsg) { - log.info("/trips/"+tripId+"/getPathAndItems"); tripService.getPathAndItems(tripId, pathAndItemRequestMsg); } } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripItemController.java b/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripItemController.java index 59ba43c..87716c7 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripItemController.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/controller/TripItemController.java @@ -19,25 +19,21 @@ public class TripItemController { @MessageMapping("/tripItems/{tripItemId}/updatePrice") public void updateTripItemPrice(@DestinationVariable String tripItemId, @Payload TripItemPriceUpdateMsg priceUpdateMsg) { - log.info("/tripItems/"+tripItemId+"/updatePrice"); tripItemService.updateTripItemPrice(tripItemId, priceUpdateMsg); } @MessageMapping("/tripItems/{tripItemId}/updateVisitDate") public void updateTripItemVisitDate(@DestinationVariable String tripItemId, @Payload TripItemVisitDateUpdateMsg visitDateUpdateMsg) { - log.info("/tripItems/"+tripItemId+"/updateVisitDate"); tripItemService.updateTripItemVisitDate(tripItemId, visitDateUpdateMsg); } @MessageMapping("/tripItems/{tripItemId}/updateTransportation") public void updateTripItemTransportation(@DestinationVariable String tripItemId, @Payload TripItemTransportationUpdateMsg tripItemTransportationUpdateMsg) { - log.info("/tripItems/"+tripItemId+"/updateTransportation"); tripItemService.updateTripItemTransportation(tripItemId, tripItemTransportationUpdateMsg); } @MessageMapping("/tripItems/{tripItemId}/deleteItem") public void deleteTripItem(@DestinationVariable String tripItemId) { - log.info("/tripItems/"+tripItemId+"/deleteItem"); tripItemService.deleteTripItem(tripItemId); } } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java index 2b741b8..7311789 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripItemService.java @@ -6,12 +6,13 @@ import org.tenten.tentenstomp.domain.trip.dto.request.TripItemPriceUpdateMsg; import org.tenten.tentenstomp.domain.trip.dto.request.TripItemTransportationUpdateMsg; import org.tenten.tentenstomp.domain.trip.dto.request.TripItemVisitDateUpdateMsg; -import org.tenten.tentenstomp.domain.trip.dto.response.*; +import org.tenten.tentenstomp.domain.trip.dto.response.TripBudgetMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripItemMsg; +import org.tenten.tentenstomp.domain.trip.dto.response.TripPathMsg; import org.tenten.tentenstomp.domain.trip.entity.Trip; import org.tenten.tentenstomp.domain.trip.entity.TripItem; import org.tenten.tentenstomp.domain.trip.repository.TripItemRepository; import org.tenten.tentenstomp.domain.trip.repository.TripRepository; -import org.tenten.tentenstomp.global.cache.RedisCache; import org.tenten.tentenstomp.global.component.PathComponent; import org.tenten.tentenstomp.global.component.dto.request.TripPlace; import org.tenten.tentenstomp.global.component.dto.response.TripPathCalculationResult; @@ -24,14 +25,12 @@ import java.util.Map; import static org.springframework.http.HttpStatus.NOT_FOUND; -import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*; @Service @RequiredArgsConstructor public class TripItemService { private final TripItemRepository tripItemRepository; private final TripRepository tripRepository; - private final RedisCache redisCache; private final KafkaProducer kafkaProducer; private final PathComponent pathComponent; @Transactional @@ -45,12 +44,12 @@ public void updateTripItemPrice(String tripItemId, TripItemPriceUpdateMsg priceU List tripItems = trip.getTripItems(); TripBudgetMsg tripBudgetMsg = new TripBudgetMsg(trip.getId(), trip.getBudget(), trip.getTripItemPriceSum() + trip.getTransportationPriceSum()); TripItemMsg tripItemMsg = TripItemMsg.fromTripItemList(trip.getId(), tripItem.getVisitDate().toString(), tripItems, tripItem.getId(), priceUpdateMsg); - sendToKafkaAndSave(tripBudgetMsg, tripItemMsg); + kafkaProducer.sendAndSaveToRedis(tripBudgetMsg, tripItemMsg); } @Transactional public void updateTripItemVisitDate(String tripItemId, TripItemVisitDateUpdateMsg visitDateUpdateMsg) { TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); - Trip trip = tripRepository.findById(tripItem.getTrip().getId()).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 trip이 없다 " + tripItem.getTrip().getId(), NOT_FOUND)); + Trip trip = tripRepository.getReferenceById(tripItem.getTrip().getId()); LocalDate pastDate = tripItem.getVisitDate(); LocalDate newDate = LocalDate.parse(visitDateUpdateMsg.visitDate()); @@ -90,13 +89,13 @@ public void updateTripItemVisitDate(String tripItemId, TripItemVisitDateUpdateMs TripPathMsg newDateTripPathMsg = new TripPathMsg(trip.getId(), newDate.toString(), newDateTripPath.tripPathInfoMsgs()); TripBudgetMsg tripBudgetMsg = new TripBudgetMsg(trip.getId(), trip.getBudget(), trip.getTripItemPriceSum() + trip.getTransportationPriceSum()); - sendToKafkaAndSave(pastDateTripItemMsg, newDateTripItemMsg, pastDateTripPathMsg, newDateTripPathMsg, tripBudgetMsg); + kafkaProducer.sendAndSaveToRedis(pastDateTripItemMsg, newDateTripItemMsg, pastDateTripPathMsg, newDateTripPathMsg, tripBudgetMsg); } @Transactional public void deleteTripItem(String tripItemId) { TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); - Trip trip = tripRepository.findById(tripItem.getTrip().getId()).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 trip이 없다 " + tripItem.getTrip().getId(), NOT_FOUND)); + Trip trip = tripRepository.getReferenceById(tripItem.getTrip().getId()); LocalDate visitDate = tripItem.getVisitDate(); @@ -124,13 +123,13 @@ public void deleteTripItem(String tripItemId) { TripPathMsg tripPathMsg = new TripPathMsg(trip.getId(), visitDate.toString(), tripPath.tripPathInfoMsgs()); TripBudgetMsg tripBudgetMsg = new TripBudgetMsg(trip.getId(), trip.getBudget(), trip.getTripItemPriceSum() + trip.getTransportationPriceSum()); - sendToKafkaAndSave(tripItemMsg, tripPathMsg, tripBudgetMsg); + kafkaProducer.sendAndSaveToRedis(tripItemMsg, tripPathMsg, tripBudgetMsg); } @Transactional public void updateTripItemTransportation(String tripItemId, TripItemTransportationUpdateMsg tripItemTransportationUpdateMsg) { TripItem tripItem = tripItemRepository.findById(Long.parseLong(tripItemId)).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 tripItem이 없다 " + tripItemId, NOT_FOUND)); - Trip trip = tripRepository.findById(tripItem.getTrip().getId()).orElseThrow(() -> new GlobalException("해당 아이디로 존재하는 trip이 없다 " + tripItem.getTrip().getId(), NOT_FOUND)); + Trip trip = tripRepository.getReferenceById(tripItem.getTrip().getId()); LocalDate visitDate = tripItem.getVisitDate(); List tripItems = tripItemRepository.findTripItemByTripIdAndVisitDate(tripItem.getTrip().getId(), visitDate); @@ -153,37 +152,8 @@ public void updateTripItemTransportation(String tripItemId, TripItemTransportati TripPathMsg tripPathMsg = new TripPathMsg(trip.getId(), visitDate.toString(), tripPath.tripPathInfoMsgs()); TripBudgetMsg tripBudgetMsg = new TripBudgetMsg(trip.getId(), trip.getBudget(), trip.getTripItemPriceSum() + trip.getTransportationPriceSum()); - sendToKafkaAndSave(tripItemMsg, tripPathMsg, tripBudgetMsg); + kafkaProducer.sendAndSaveToRedis(tripItemMsg, tripPathMsg, tripBudgetMsg); } - private void sendToKafkaAndSave(Object... dataArgs) { - for (Object data : dataArgs) { - if (data.getClass().equals(TripPathMsg.class)) { - kafkaProducer.send(PATH, data); - TripPathMsg tripPathMsg = (TripPathMsg) data; - redisCache.save(PATH, Long.toString(tripPathMsg.tripId()), tripPathMsg.visitDate(), tripPathMsg); - } - if (data.getClass().equals(TripItemMsg.class)) { - kafkaProducer.send(TRIP_ITEM, data); - TripItemMsg tripItemMsg = (TripItemMsg) data; - redisCache.save(TRIP_ITEM, Long.toString(tripItemMsg.tripId()), tripItemMsg.visitDate(), tripItemMsg); - } - if (data.getClass().equals(TripInfoMsg.class)) { - kafkaProducer.send(TRIP_INFO, data); - TripInfoMsg tripInfoMsg = (TripInfoMsg) data; - redisCache.save(TRIP_INFO, Long.toString(tripInfoMsg.tripId()), tripInfoMsg); - } - if (data.getClass().equals(TripMemberMsg.class)) { - kafkaProducer.send(MEMBER, data); - TripMemberMsg tripMemberMsg = (TripMemberMsg) data; - redisCache.save(MEMBER, Long.toString(tripMemberMsg.tripId()), tripMemberMsg); - } - if (data.getClass().equals(TripBudgetMsg.class)) { - kafkaProducer.send(BUDGET, data); - TripBudgetMsg tripBudgetMsg = (TripBudgetMsg) data; - redisCache.save(BUDGET, Long.toString(tripBudgetMsg.tripId()), tripBudgetMsg); - } - } - } } diff --git a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java index 21e8e0e..967d1b3 100644 --- a/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java +++ b/src/main/java/org/tenten/tentenstomp/domain/trip/service/TripService.java @@ -53,7 +53,7 @@ public void connectMember(String tripId, MemberConnectMsg memberConnectMsg) { Long.parseLong(tripId), connectedMemberMap.values().stream().toList(), memberRepository.findTripMemberInfoByTripId(Long.parseLong(tripId)), trip.getNumberOfPeople() ); tripConnectedMemberMap.put(tripId, connectedMemberMap); - sendToKafkaAndSave(tripMemberMsg); + kafkaProducer.sendAndSaveToRedis(tripMemberMsg); } @Transactional @@ -62,12 +62,12 @@ public void getConnectedMember(String tripId) { Trip trip = tripRepository.getReferenceById(Long.parseLong(tripId)); TripMemberMsg tripMemberMsg = new TripMemberMsg( - Long.parseLong(tripId), - connectedMemberMap.values().stream().toList(), - memberRepository.findTripMemberInfoByTripId(Long.parseLong(tripId)), - trip.getNumberOfPeople() + Long.parseLong(tripId), + connectedMemberMap.values().stream().toList(), + memberRepository.findTripMemberInfoByTripId(Long.parseLong(tripId)), + trip.getNumberOfPeople() ); - sendToKafkaAndSave(tripMemberMsg); + kafkaProducer.sendAndSaveToRedis(tripMemberMsg); } @Transactional @@ -80,7 +80,7 @@ public void disconnectMember(String tripId, MemberDisconnectMsg memberDisconnect Long.parseLong(tripId), connectedMemberMap.values().stream().toList(), memberRepository.findTripMemberInfoByTripId(Long.parseLong(tripId)), trip.getNumberOfPeople() ); tripConnectedMemberMap.put(tripId, connectedMemberMap); - sendToKafkaAndSave(tripMemberMsg); + kafkaProducer.sendAndSaveToRedis(tripMemberMsg); } @@ -106,7 +106,7 @@ public void updateTrip(String tripId, TripUpdateMsg tripUpdateMsg) { ); tripRepository.save(trip); - sendToKafkaAndSave(tripInfoMsg, tripBudgetMsg); + kafkaProducer.sendAndSaveToRedis(tripInfoMsg, tripBudgetMsg); } @@ -138,7 +138,7 @@ private void updateBudgetAndItemsAndPath(Trip trip, List tripItems, St TripItemMsg tripItemMsg = TripItemMsg.fromTripItemList(trip.getId(), visitDate, tripItems); TripPathMsg tripPathMsg = new TripPathMsg(trip.getId(), visitDate, tripPath.tripPathInfoMsgs()); - sendToKafkaAndSave(tripBudgetMsg, tripItemMsg, tripPathMsg); + kafkaProducer.sendAndSaveToRedis(tripBudgetMsg, tripItemMsg, tripPathMsg); } @Transactional @@ -164,36 +164,6 @@ public void getPathAndItems(String tripId, PathAndItemRequestMsg pathAndItemRequ kafkaProducer.send(PATH, getTripPathMsg(trip, pathAndItemRequestMsg.visitDate())); } - private void sendToKafkaAndSave(Object... dataArgs) { - for (Object data : dataArgs) { - if (data.getClass().equals(TripPathMsg.class)) { - kafkaProducer.send(PATH, data); - TripPathMsg tripPathMsg = (TripPathMsg) data; - redisCache.save(PATH, Long.toString(tripPathMsg.tripId()), tripPathMsg.visitDate(), tripPathMsg); - } - if (data.getClass().equals(TripItemMsg.class)) { - kafkaProducer.send(TRIP_ITEM, data); - TripItemMsg tripItemMsg = (TripItemMsg) data; - redisCache.save(TRIP_ITEM, Long.toString(tripItemMsg.tripId()), tripItemMsg.visitDate(), tripItemMsg); - } - if (data.getClass().equals(TripInfoMsg.class)) { - kafkaProducer.send(TRIP_INFO, data); - TripInfoMsg tripInfoMsg = (TripInfoMsg) data; - redisCache.save(TRIP_INFO, Long.toString(tripInfoMsg.tripId()), tripInfoMsg); - } - if (data.getClass().equals(TripMemberMsg.class)) { - kafkaProducer.send(MEMBER, data); - TripMemberMsg tripMemberMsg = (TripMemberMsg) data; - redisCache.save(MEMBER, Long.toString(tripMemberMsg.tripId()), tripMemberMsg); - } - if (data.getClass().equals(TripBudgetMsg.class)) { - kafkaProducer.send(BUDGET, data); - TripBudgetMsg tripBudgetMsg = (TripBudgetMsg) data; - redisCache.save(BUDGET, Long.toString(tripBudgetMsg.tripId()), tripBudgetMsg); - } - } - } - private TripMemberMsg getTripMemberMsg(String tripId) { Object cached = redisCache.get(MEMBER, tripId); if (cached != null) { diff --git a/src/main/java/org/tenten/tentenstomp/global/cache/RedisCache.java b/src/main/java/org/tenten/tentenstomp/global/cache/RedisCache.java index 922e655..f7d0855 100644 --- a/src/main/java/org/tenten/tentenstomp/global/cache/RedisCache.java +++ b/src/main/java/org/tenten/tentenstomp/global/cache/RedisCache.java @@ -3,10 +3,8 @@ import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import org.tenten.tentenstomp.global.common.constant.RedisConstant; - -import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.tenten.tentenstomp.global.common.constant.RedisConstant.CACHE_EXPIRE_TIME_MS; @Component @@ -15,19 +13,27 @@ public class RedisCache { private final RedisTemplate redisTemplate; public void save(String topic, String key, Object data) { - redisTemplate.opsForValue().set(topic+":"+key, data, CACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS); + redisTemplate.opsForValue().set(topic+":"+key, data, CACHE_EXPIRE_TIME_MS, MILLISECONDS); } public void save(String topic, String key, String visitDate, Object data) { - redisTemplate.opsForValue().set(topic + ":" + key + ":" + visitDate, data, CACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS); + redisTemplate.opsForValue().set(topic + ":" + key + ":" + visitDate, data, CACHE_EXPIRE_TIME_MS, MILLISECONDS); } public Object get(String topic, String key) { - return redisTemplate.opsForValue().get(topic+":"+key); + Object cached = redisTemplate.opsForValue().get(topic + ":" + key); + if (cached != null) { + redisTemplate.expire(topic + ":" + key, CACHE_EXPIRE_TIME_MS, MILLISECONDS); + } + return cached; } public Object get(String topic, String key, String visitDate) { - return redisTemplate.opsForValue().get(topic+":"+key+":"+visitDate); + Object cached = redisTemplate.opsForValue().get(topic + ":" + key + ":" + visitDate); + if (cached != null) { + redisTemplate.expire(topic + ":" + key + ":" + visitDate, CACHE_EXPIRE_TIME_MS, MILLISECONDS); + } + return cached; } public void delete(String topic, String key) { diff --git a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java index b4ef988..0380e33 100644 --- a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java +++ b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/consumer/KafkaConsumer.java @@ -5,7 +5,6 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.stereotype.Service; -import org.tenten.tentenstomp.domain.trip.dto.request.TripUpdateMsg; import org.tenten.tentenstomp.domain.trip.dto.response.*; import org.tenten.tentenstomp.global.response.GlobalStompResponse; import org.tenten.tentenstomp.global.util.TopicUtil; @@ -23,11 +22,6 @@ public class KafkaConsumer { private final SimpMessageSendingOperations messagingTemplate; private final TopicUtil topicUtil; - @KafkaListener(topics = "kafka", groupId = GROUP_ID_CONFIG) - public void consumeTest(TripUpdateMsg data) { - messagingTemplate.convertAndSend("/sub/kafka", data); - } - @KafkaListener(topics = TRIP_INFO, groupId = GROUP_ID_CONFIG) public void updateTripInfo(TripInfoMsg tripInfoMsg) { String destination = topicUtil.topicToReturnEndPoint(tripInfoMsg.tripId(), TRIP_INFO); diff --git a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/producer/KafkaProducer.java b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/producer/KafkaProducer.java index 8045e2c..6afaf1f 100644 --- a/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/producer/KafkaProducer.java +++ b/src/main/java/org/tenten/tentenstomp/global/messaging/kafka/producer/KafkaProducer.java @@ -3,14 +3,49 @@ import lombok.RequiredArgsConstructor; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; +import org.tenten.tentenstomp.domain.trip.dto.response.*; +import org.tenten.tentenstomp.global.cache.RedisCache; + +import static org.tenten.tentenstomp.global.common.constant.TopicConstant.*; @Service @RequiredArgsConstructor public class KafkaProducer { private final KafkaTemplate kafkaTemplate; + private final RedisCache redisCache; public void send(String to, Object data) { kafkaTemplate.send(to, data); } + public void sendAndSaveToRedis(Object... dataArgs) { + for (Object data : dataArgs) { + if (data.getClass().equals(TripPathMsg.class)) { + send(PATH, data); + TripPathMsg tripPathMsg = (TripPathMsg) data; + redisCache.save(PATH, Long.toString(tripPathMsg.tripId()), tripPathMsg.visitDate(), tripPathMsg); + } + if (data.getClass().equals(TripItemMsg.class)) { + send(TRIP_ITEM, data); + TripItemMsg tripItemMsg = (TripItemMsg) data; + redisCache.save(TRIP_ITEM, Long.toString(tripItemMsg.tripId()), tripItemMsg.visitDate(), tripItemMsg); + } + if (data.getClass().equals(TripInfoMsg.class)) { + send(TRIP_INFO, data); + TripInfoMsg tripInfoMsg = (TripInfoMsg) data; + redisCache.save(TRIP_INFO, Long.toString(tripInfoMsg.tripId()), tripInfoMsg); + } + if (data.getClass().equals(TripMemberMsg.class)) { + send(MEMBER, data); + TripMemberMsg tripMemberMsg = (TripMemberMsg) data; + redisCache.save(MEMBER, Long.toString(tripMemberMsg.tripId()), tripMemberMsg); + } + if (data.getClass().equals(TripBudgetMsg.class)) { + send(BUDGET, data); + TripBudgetMsg tripBudgetMsg = (TripBudgetMsg) data; + redisCache.save(BUDGET, Long.toString(tripBudgetMsg.tripId()), tripBudgetMsg); + } + } + } + } diff --git a/src/main/java/org/tenten/tentenstomp/global/stomp/StompExceptionHandler.java b/src/main/java/org/tenten/tentenstomp/global/stomp/StompExceptionHandler.java new file mode 100644 index 0000000..cc4d5c9 --- /dev/null +++ b/src/main/java/org/tenten/tentenstomp/global/stomp/StompExceptionHandler.java @@ -0,0 +1,32 @@ +package org.tenten.tentenstomp.global.stomp; + +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.messaging.StompSubProtocolErrorHandler; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.springframework.messaging.simp.stomp.StompCommand.ERROR; + +@Component +public class StompExceptionHandler extends StompSubProtocolErrorHandler { + @Override + public Message handleClientMessageProcessingError(Message clientMessage, Throwable ex) { + if (ex.getMessage().equals("UNAUTHORIZED")) { + return errorMessage("유효하지 않은 권한"); + } + + return super.handleClientMessageProcessingError(clientMessage, ex); + } + + + private Message errorMessage(String errorMessage) { + + StompHeaderAccessor accessor = StompHeaderAccessor.create(ERROR); + accessor.setLeaveMutable(true); + + return MessageBuilder.createMessage(errorMessage.getBytes(UTF_8), + accessor.getMessageHeaders()); + } +} diff --git a/src/main/java/org/tenten/tentenstomp/global/stomp/StompPreHandler.java b/src/main/java/org/tenten/tentenstomp/global/stomp/StompPreHandler.java new file mode 100644 index 0000000..cf12648 --- /dev/null +++ b/src/main/java/org/tenten/tentenstomp/global/stomp/StompPreHandler.java @@ -0,0 +1,23 @@ +package org.tenten.tentenstomp.global.stomp; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class StompPreHandler implements ChannelInterceptor { + @Override + public Message preSend(Message message, MessageChannel channel) { + StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message); + log.info("msgType : " + headerAccessor.getMessageType().toString() + " dest : " + headerAccessor.getDestination()); +// String authorizationHeader = String.valueOf(headerAccessor.getNativeHeader("Authorization")); +// if (authorizationHeader == null || authorizationHeader.equals("null")) { +// throw new MessageDeliveryException("UNAUTHORIZED"); +// } + return ChannelInterceptor.super.preSend(message, channel); + } +}