From 95bc535448b1af060124b2c16e71601398d5e4ef Mon Sep 17 00:00:00 2001 From: hawardShin Date: Mon, 23 Sep 2024 12:06:39 +0900 Subject: [PATCH 1/7] Feat: SQS listener in global Context --- .../listener/SqsListenerControlService.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java index 839c20e..ef1edab 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java @@ -1,5 +1,7 @@ package mejai.mejaigg.messaging.sqs.listener; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer; @@ -9,17 +11,24 @@ @Service @RequiredArgsConstructor @Slf4j -public class SqsListenerControlService { - private final SqsMessageListenerContainer sqsMessageListenerContainer; +public class SqsListenerControlService implements ApplicationContextAware { + private static ApplicationContext context; - public void stopListener() { - sqsMessageListenerContainer.stop(); - log.info("SQS Listener has been stopped."); + @Override + public void setApplicationContext(ApplicationContext ctx) { + context = ctx; } - public void startListener() { - sqsMessageListenerContainer.start(); - log.info("SQS Listener has been started."); + public static void requestStop() { + SqsMessageListenerContainer container = context.getBean(SqsMessageListenerContainer.class); + container.stop(); + log.info("SQS Listener has been stopped due to an error during message processing."); } + public static void requestStart() { + SqsMessageListenerContainer container = context.getBean(SqsMessageListenerContainer.class); + container.start(); + log.info("SQS Listener has been started."); + } + } From 5bd6839aca45422b448164604209c6a82e802106 Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 09:45:40 +0900 Subject: [PATCH 2/7] Feat: Set Sqs acknowledgementMode --- .../messaging/sqs/config/SqsListenerConfig.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java b/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java index 43b5805..3ad3c92 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java @@ -7,12 +7,15 @@ import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import mejai.mejaigg.messaging.sqs.listener.MyMessageListener; import software.amazon.awssdk.services.sqs.SqsAsyncClient; @Configuration @RequiredArgsConstructor +@Slf4j public class SqsListenerConfig { private final MyMessageListener myMessageListener; @@ -21,14 +24,21 @@ public SqsMessageListenerContainer sqsMessageListenerContainer(SqsAsyncC SqsMessageListenerContainerFactory factory = SqsMessageListenerContainerFactory.builder() .sqsAsyncClient(sqsAsyncClient) .configure(options -> options - .maxConcurrentMessages(10) - .pollTimeout(Duration.ofSeconds(10))) + .acknowledgementMode(AcknowledgementMode.MANUAL) // 수동 확인 모드 + .maxConcurrentMessages(10) // 최대 동시 메시지 처리 수 + .pollTimeout(Duration.ofSeconds(10))) // 폴링 타임아웃 .build(); // SqsMessageListenerContainer 생성 SqsMessageListenerContainer container = factory.createContainer("mejai-renewal-sqs"); // MessageListener 설정 - container.setMessageListener(myMessageListener); + container.setMessageListener((message) -> { + try { + myMessageListener.onMessage(message); // 메시지 처리 명시적으로 메시지 삭제 진행 + } catch (Exception e) { + log.warn("Failed to process message: " + e.getMessage()); + } + }); return container; } From 5774fbdcdfd9924ea75d144b4d25cc39730f4124 Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 10:19:34 +0900 Subject: [PATCH 3/7] Feat: Set Sqs Listener config to static --- src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java | 5 ++--- .../messaging/sqs/listener/SqsListenerControlService.java | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java b/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java index 4508a01..05b4fdd 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java @@ -27,7 +27,6 @@ public class SqsController { private final MessageSender messageSender; - private final SqsListenerControlService listenerControlService; private final ObjectMapper objectMapper; @GetMapping("/post") @@ -53,13 +52,13 @@ public List streak(@RequestBody UserStreakRequest request) throws @PostMapping("/start") public String startListener() { - listenerControlService.startListener(); + SqsListenerControlService.requestStart(); return "SQS Listener started."; } @PostMapping("/stop") public String stopListener() { - listenerControlService.stopListener(); + SqsListenerControlService.requestStop(); return "SQS Listener stopped."; } } diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java index ef1edab..031884f 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java @@ -30,5 +30,4 @@ public static void requestStart() { container.start(); log.info("SQS Listener has been started."); } - } From 83cf76c0ebfaf173e7db8b39dc7b09c5c2fa73b6 Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 10:29:20 +0900 Subject: [PATCH 4/7] Feat: handle sqs Message --- .../sqs/listener/MyMessageListener.java | 64 +++++++++++++++---- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java b/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java index 362e199..632861c 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java @@ -7,14 +7,24 @@ import io.awspring.cloud.sqs.listener.MessageListener; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import mejai.mejaigg.matchstreak.service.StreakService; +import mejai.mejaigg.riot.exception.ClientErrorCode; +import mejai.mejaigg.riot.exception.ClientException; import mejai.mejaigg.summoner.dto.request.UserProfileRequest; import mejai.mejaigg.summoner.dto.request.UserStreakRequest; +import mejai.mejaigg.summoner.service.ProfileService; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; @RequiredArgsConstructor @Component +@Slf4j public class MyMessageListener implements MessageListener { private final ObjectMapper objectMapper; - // private final ProfileService profileService; + private final ProfileService profileService; + private final StreakService streakService; + private final SqsAsyncClient sqsAsyncClient; @Override public void onMessage(Message message) { @@ -24,25 +34,55 @@ public void onMessage(Message message) { // 메시지 타입에 따라 처리 if (payload.contains("year") && payload.contains("month")) { UserStreakRequest request = objectMapper.readValue(payload, UserStreakRequest.class); - handleUserStreakRequest(request); + streakService.refreshStreak(request); } else { UserProfileRequest request = objectMapper.readValue(payload, UserProfileRequest.class); - handleUserProfileRequest(request); + profileService.refreshUserProfileByNameTag(request.getId(), + request.getTag()); + } + + // ACK 처리 + String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class); + if (receiptHandle != null) { + deleteMessage(receiptHandle); + log.info("Message deleted successfully."); + } else { + log.warn("Receipt handle not found for the message."); + } + + } catch (ClientException e) { + // 특정 에러 코드가 TOO_MANY_REQUESTS인 경우에만 처리 + if (e.getClientErrorCode() == ClientErrorCode.TOO_MANY_REQUESTS) { + // TOO_MANY_REQUESTS 에러에 대한 처리 로직 + log.info("Too many requests. Waiting for a while."); + SqsListenerControlService.requestStop(); + + } else { // 그 외의 경우에는 잘못된 형식이 온 것이므로 큐에서 제거. + String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class); + deleteMessage(receiptHandle); } } catch (Exception e) { // 예외 처리 - System.err.println("Failed to process message: " + e.getMessage()); + log.warn("Failed to process message: " + e.getMessage()); + SqsListenerControlService.requestStop(); + log.info("SQS Listener has been stopped."); + //TODO: 스캐줄러 걸어서 특정 시간 이후에 켜주기 } } - private void handleUserStreakRequest(UserStreakRequest request) { - System.out.println("Handling UserStreakRequest: " + request); - // 비즈니스 로직 처리 - // profileService.getUserProfileByNameTag(request.getId(), request.getTag()); - } + private void deleteMessage(String receiptHandle) { + // 메시지 삭제 요청 + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() + .queueUrl("mejai-renewal-sqs") // SQS 큐 URL + .receiptHandle(receiptHandle) + .build(); - private void handleUserProfileRequest(UserProfileRequest request) { - System.out.println("Handling UserProfileRequest: " + request); - // 비즈니스 로직 처리 + sqsAsyncClient.deleteMessage(deleteMessageRequest).whenComplete((result, exception) -> { + if (exception != null) { + log.error("Failed to delete message: {}", exception.getMessage()); + } else { + log.info("Message successfully deleted from the queue."); + } + }); } } From 5719e6b52ba219d839bbd1c270551a66b9f0cd0b Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 10:39:23 +0900 Subject: [PATCH 5/7] Feat: Set log --- .../mejai/mejaigg/summoner/service/ProfileService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java b/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java index 576a00e..459bd8a 100644 --- a/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java +++ b/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java @@ -12,7 +12,6 @@ import mejai.mejaigg.global.config.RiotProperties; import mejai.mejaigg.rank.domain.Rank; import mejai.mejaigg.rank.dto.RankDto; -import mejai.mejaigg.rank.repository.RankRepository; import mejai.mejaigg.riot.dto.AccountDto; import mejai.mejaigg.riot.dto.SummonerDto; import mejai.mejaigg.riot.service.RiotService; @@ -28,7 +27,6 @@ public class ProfileService { private final RiotService riotService; private final SummonerRepository summonerRepository; - private final RankRepository rankRepository; private final RiotProperties riotProperties; /** @@ -94,12 +92,15 @@ private Summoner initializeSummonerData(String name, String tag) { public UserProfileDto refreshUserProfileByNameTag(String name, String tag) { Summoner summoner = summonerRepository.findBySummonerNameAndTagLineAllIgnoreCase(name, tag).orElse(null); if (summoner == null) { + log.info("유저가 없어 초기화를 진행합니다."); summoner = initializeSummonerData(name, tag); } else { if (summoner.getUpdatedAt().plusHours(2).isAfter(LocalDateTime.now())) log.info("2시간이 지나지 않아 강제 업데이트를 할 수 없습니다."); - else + else { + log.info("2시간이 지나 강제 업데이트를 진행합니다."); updateUserDetails(summoner); + } } UserProfileDto userProfileDto = new UserProfileDto(); userProfileDto.setBySummoner(summoner, riotProperties.getResourceUrl()); From a8279667c6f429317f0a4bdf4281fdf830401ec3 Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 10:40:52 +0900 Subject: [PATCH 6/7] Feat: change docker volume --- docker-compose.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index ff5d8b0..d56b2d4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ services: - POSTGRES_PASSWORD=compose-postgres ports: - "5432:5432" + volumes: + - postgres-mejai-data:/var/lib/postgresql/data # 서비스 명 pgadmin: # 사용할 이미지 @@ -43,3 +45,5 @@ services: SPRING_DATASOURCE_PASSWORD: compose-postgres +volumes: + postgres-mejai-data: From f952db123aeb6eab5879b69cee34fa998f204b35 Mon Sep 17 00:00:00 2001 From: hawardShin Date: Tue, 24 Sep 2024 14:18:47 +0900 Subject: [PATCH 7/7] Feat: Set Scheduling --- .../mejai/mejaigg/scheduler/CronConfig.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/main/java/mejai/mejaigg/scheduler/CronConfig.java diff --git a/src/main/java/mejai/mejaigg/scheduler/CronConfig.java b/src/main/java/mejai/mejaigg/scheduler/CronConfig.java new file mode 100644 index 0000000..d3a0640 --- /dev/null +++ b/src/main/java/mejai/mejaigg/scheduler/CronConfig.java @@ -0,0 +1,18 @@ +package mejai.mejaigg.scheduler; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import mejai.mejaigg.messaging.sqs.listener.SqsListenerControlService; + +@Configuration +@EnableScheduling +@Component +public class CronConfig { + @Scheduled(cron = "0 * * * * *") // 1분마다 실행 + public void test() { + SqsListenerControlService.requestStart(); + } +}