diff --git a/docker-compose.yml b/docker-compose.yml index ff5d8b0..d56b2d4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ services: - POSTGRES_PASSWORD=compose-postgres ports: - "5432:5432" + volumes: + - postgres-mejai-data:/var/lib/postgresql/data # 서비스 명 pgadmin: # 사용할 이미지 @@ -43,3 +45,5 @@ services: SPRING_DATASOURCE_PASSWORD: compose-postgres +volumes: + postgres-mejai-data: diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java b/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java index 4508a01..05b4fdd 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java @@ -27,7 +27,6 @@ public class SqsController { private final MessageSender messageSender; - private final SqsListenerControlService listenerControlService; private final ObjectMapper objectMapper; @GetMapping("/post") @@ -53,13 +52,13 @@ public List streak(@RequestBody UserStreakRequest request) throws @PostMapping("/start") public String startListener() { - listenerControlService.startListener(); + SqsListenerControlService.requestStart(); return "SQS Listener started."; } @PostMapping("/stop") public String stopListener() { - listenerControlService.stopListener(); + SqsListenerControlService.requestStop(); return "SQS Listener stopped."; } } diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java b/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java index 43b5805..3ad3c92 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/config/SqsListenerConfig.java @@ -7,12 +7,15 @@ import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import mejai.mejaigg.messaging.sqs.listener.MyMessageListener; import software.amazon.awssdk.services.sqs.SqsAsyncClient; @Configuration @RequiredArgsConstructor +@Slf4j public class SqsListenerConfig { private final MyMessageListener myMessageListener; @@ -21,14 +24,21 @@ public SqsMessageListenerContainer sqsMessageListenerContainer(SqsAsyncC SqsMessageListenerContainerFactory factory = SqsMessageListenerContainerFactory.builder() .sqsAsyncClient(sqsAsyncClient) .configure(options -> options - .maxConcurrentMessages(10) - .pollTimeout(Duration.ofSeconds(10))) + .acknowledgementMode(AcknowledgementMode.MANUAL) // 수동 확인 모드 + .maxConcurrentMessages(10) // 최대 동시 메시지 처리 수 + .pollTimeout(Duration.ofSeconds(10))) // 폴링 타임아웃 .build(); // SqsMessageListenerContainer 생성 SqsMessageListenerContainer container = factory.createContainer("mejai-renewal-sqs"); // MessageListener 설정 - container.setMessageListener(myMessageListener); + container.setMessageListener((message) -> { + try { + myMessageListener.onMessage(message); // 메시지 처리 명시적으로 메시지 삭제 진행 + } catch (Exception e) { + log.warn("Failed to process message: " + e.getMessage()); + } + }); return container; } diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java b/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java index 362e199..632861c 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/listener/MyMessageListener.java @@ -7,14 +7,24 @@ import io.awspring.cloud.sqs.listener.MessageListener; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import mejai.mejaigg.matchstreak.service.StreakService; +import mejai.mejaigg.riot.exception.ClientErrorCode; +import mejai.mejaigg.riot.exception.ClientException; import mejai.mejaigg.summoner.dto.request.UserProfileRequest; import mejai.mejaigg.summoner.dto.request.UserStreakRequest; +import mejai.mejaigg.summoner.service.ProfileService; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; @RequiredArgsConstructor @Component +@Slf4j public class MyMessageListener implements MessageListener { private final ObjectMapper objectMapper; - // private final ProfileService profileService; + private final ProfileService profileService; + private final StreakService streakService; + private final SqsAsyncClient sqsAsyncClient; @Override public void onMessage(Message message) { @@ -24,25 +34,55 @@ public void onMessage(Message message) { // 메시지 타입에 따라 처리 if (payload.contains("year") && payload.contains("month")) { UserStreakRequest request = objectMapper.readValue(payload, UserStreakRequest.class); - handleUserStreakRequest(request); + streakService.refreshStreak(request); } else { UserProfileRequest request = objectMapper.readValue(payload, UserProfileRequest.class); - handleUserProfileRequest(request); + profileService.refreshUserProfileByNameTag(request.getId(), + request.getTag()); + } + + // ACK 처리 + String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class); + if (receiptHandle != null) { + deleteMessage(receiptHandle); + log.info("Message deleted successfully."); + } else { + log.warn("Receipt handle not found for the message."); + } + + } catch (ClientException e) { + // 특정 에러 코드가 TOO_MANY_REQUESTS인 경우에만 처리 + if (e.getClientErrorCode() == ClientErrorCode.TOO_MANY_REQUESTS) { + // TOO_MANY_REQUESTS 에러에 대한 처리 로직 + log.info("Too many requests. Waiting for a while."); + SqsListenerControlService.requestStop(); + + } else { // 그 외의 경우에는 잘못된 형식이 온 것이므로 큐에서 제거. + String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class); + deleteMessage(receiptHandle); } } catch (Exception e) { // 예외 처리 - System.err.println("Failed to process message: " + e.getMessage()); + log.warn("Failed to process message: " + e.getMessage()); + SqsListenerControlService.requestStop(); + log.info("SQS Listener has been stopped."); + //TODO: 스캐줄러 걸어서 특정 시간 이후에 켜주기 } } - private void handleUserStreakRequest(UserStreakRequest request) { - System.out.println("Handling UserStreakRequest: " + request); - // 비즈니스 로직 처리 - // profileService.getUserProfileByNameTag(request.getId(), request.getTag()); - } + private void deleteMessage(String receiptHandle) { + // 메시지 삭제 요청 + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() + .queueUrl("mejai-renewal-sqs") // SQS 큐 URL + .receiptHandle(receiptHandle) + .build(); - private void handleUserProfileRequest(UserProfileRequest request) { - System.out.println("Handling UserProfileRequest: " + request); - // 비즈니스 로직 처리 + sqsAsyncClient.deleteMessage(deleteMessageRequest).whenComplete((result, exception) -> { + if (exception != null) { + log.error("Failed to delete message: {}", exception.getMessage()); + } else { + log.info("Message successfully deleted from the queue."); + } + }); } } diff --git a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java index 839c20e..031884f 100644 --- a/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java +++ b/src/main/java/mejai/mejaigg/messaging/sqs/listener/SqsListenerControlService.java @@ -1,5 +1,7 @@ package mejai.mejaigg.messaging.sqs.listener; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer; @@ -9,17 +11,23 @@ @Service @RequiredArgsConstructor @Slf4j -public class SqsListenerControlService { - private final SqsMessageListenerContainer sqsMessageListenerContainer; +public class SqsListenerControlService implements ApplicationContextAware { + private static ApplicationContext context; - public void stopListener() { - sqsMessageListenerContainer.stop(); - log.info("SQS Listener has been stopped."); + @Override + public void setApplicationContext(ApplicationContext ctx) { + context = ctx; } - public void startListener() { - sqsMessageListenerContainer.start(); - log.info("SQS Listener has been started."); + public static void requestStop() { + SqsMessageListenerContainer container = context.getBean(SqsMessageListenerContainer.class); + container.stop(); + log.info("SQS Listener has been stopped due to an error during message processing."); } + public static void requestStart() { + SqsMessageListenerContainer container = context.getBean(SqsMessageListenerContainer.class); + container.start(); + log.info("SQS Listener has been started."); + } } diff --git a/src/main/java/mejai/mejaigg/scheduler/CronConfig.java b/src/main/java/mejai/mejaigg/scheduler/CronConfig.java new file mode 100644 index 0000000..d3a0640 --- /dev/null +++ b/src/main/java/mejai/mejaigg/scheduler/CronConfig.java @@ -0,0 +1,18 @@ +package mejai.mejaigg.scheduler; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import mejai.mejaigg.messaging.sqs.listener.SqsListenerControlService; + +@Configuration +@EnableScheduling +@Component +public class CronConfig { + @Scheduled(cron = "0 * * * * *") // 1분마다 실행 + public void test() { + SqsListenerControlService.requestStart(); + } +} diff --git a/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java b/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java index 576a00e..459bd8a 100644 --- a/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java +++ b/src/main/java/mejai/mejaigg/summoner/service/ProfileService.java @@ -12,7 +12,6 @@ import mejai.mejaigg.global.config.RiotProperties; import mejai.mejaigg.rank.domain.Rank; import mejai.mejaigg.rank.dto.RankDto; -import mejai.mejaigg.rank.repository.RankRepository; import mejai.mejaigg.riot.dto.AccountDto; import mejai.mejaigg.riot.dto.SummonerDto; import mejai.mejaigg.riot.service.RiotService; @@ -28,7 +27,6 @@ public class ProfileService { private final RiotService riotService; private final SummonerRepository summonerRepository; - private final RankRepository rankRepository; private final RiotProperties riotProperties; /** @@ -94,12 +92,15 @@ private Summoner initializeSummonerData(String name, String tag) { public UserProfileDto refreshUserProfileByNameTag(String name, String tag) { Summoner summoner = summonerRepository.findBySummonerNameAndTagLineAllIgnoreCase(name, tag).orElse(null); if (summoner == null) { + log.info("유저가 없어 초기화를 진행합니다."); summoner = initializeSummonerData(name, tag); } else { if (summoner.getUpdatedAt().plusHours(2).isAfter(LocalDateTime.now())) log.info("2시간이 지나지 않아 강제 업데이트를 할 수 없습니다."); - else + else { + log.info("2시간이 지나 강제 업데이트를 진행합니다."); updateUserDetails(summoner); + } } UserProfileDto userProfileDto = new UserProfileDto(); userProfileDto.setBySummoner(summoner, riotProperties.getResourceUrl());