Skip to content

Commit

Permalink
Merge pull request #19 from mejaiKR/feature/sqs
Browse files Browse the repository at this point in the history
Feature/sqs
  • Loading branch information
Hawardshin authored Sep 24, 2024
2 parents f8b54d3 + f952db1 commit eb6d614
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 29 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
- POSTGRES_PASSWORD=compose-postgres
ports:
- "5432:5432"
volumes:
- postgres-mejai-data:/var/lib/postgresql/data
# 서비스 명
pgadmin:
# 사용할 이미지
Expand Down Expand Up @@ -43,3 +45,5 @@ services:
SPRING_DATASOURCE_PASSWORD: compose-postgres


volumes:
postgres-mejai-data:
5 changes: 2 additions & 3 deletions src/main/java/mejai/mejaigg/messaging/sqs/SqsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class SqsController {

private final MessageSender messageSender;
private final SqsListenerControlService listenerControlService;
private final ObjectMapper objectMapper;

@GetMapping("/post")
Expand All @@ -53,13 +52,13 @@ public List<UserStreakDto> streak(@RequestBody UserStreakRequest request) throws

@PostMapping("/start")
public String startListener() {
listenerControlService.startListener();
SqsListenerControlService.requestStart();
return "SQS Listener started.";
}

@PostMapping("/stop")
public String stopListener() {
listenerControlService.stopListener();
SqsListenerControlService.requestStop();
return "SQS Listener stopped.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mejai.mejaigg.messaging.sqs.listener.MyMessageListener;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class SqsListenerConfig {
private final MyMessageListener myMessageListener;

Expand All @@ -21,14 +24,21 @@ public SqsMessageListenerContainer<Object> sqsMessageListenerContainer(SqsAsyncC
SqsMessageListenerContainerFactory<Object> factory = SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options
.maxConcurrentMessages(10)
.pollTimeout(Duration.ofSeconds(10)))
.acknowledgementMode(AcknowledgementMode.MANUAL) // 수동 확인 모드
.maxConcurrentMessages(10) // 최대 동시 메시지 처리 수
.pollTimeout(Duration.ofSeconds(10))) // 폴링 타임아웃
.build();
// SqsMessageListenerContainer 생성
SqsMessageListenerContainer<Object> container = factory.createContainer("mejai-renewal-sqs");

// MessageListener 설정
container.setMessageListener(myMessageListener);
container.setMessageListener((message) -> {
try {
myMessageListener.onMessage(message); // 메시지 처리 명시적으로 메시지 삭제 진행
} catch (Exception e) {
log.warn("Failed to process message: " + e.getMessage());
}
});
return container;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@

import io.awspring.cloud.sqs.listener.MessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import mejai.mejaigg.matchstreak.service.StreakService;
import mejai.mejaigg.riot.exception.ClientErrorCode;
import mejai.mejaigg.riot.exception.ClientException;
import mejai.mejaigg.summoner.dto.request.UserProfileRequest;
import mejai.mejaigg.summoner.dto.request.UserStreakRequest;
import mejai.mejaigg.summoner.service.ProfileService;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;

@RequiredArgsConstructor
@Component
@Slf4j
public class MyMessageListener implements MessageListener<Object> {
private final ObjectMapper objectMapper;
// private final ProfileService profileService;
private final ProfileService profileService;
private final StreakService streakService;
private final SqsAsyncClient sqsAsyncClient;

@Override
public void onMessage(Message<Object> message) {
Expand All @@ -24,25 +34,55 @@ public void onMessage(Message<Object> message) {
// 메시지 타입에 따라 처리
if (payload.contains("year") && payload.contains("month")) {
UserStreakRequest request = objectMapper.readValue(payload, UserStreakRequest.class);
handleUserStreakRequest(request);
streakService.refreshStreak(request);
} else {
UserProfileRequest request = objectMapper.readValue(payload, UserProfileRequest.class);
handleUserProfileRequest(request);
profileService.refreshUserProfileByNameTag(request.getId(),
request.getTag());
}

// ACK 처리
String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
if (receiptHandle != null) {
deleteMessage(receiptHandle);
log.info("Message deleted successfully.");
} else {
log.warn("Receipt handle not found for the message.");
}

} catch (ClientException e) {
// 특정 에러 코드가 TOO_MANY_REQUESTS인 경우에만 처리
if (e.getClientErrorCode() == ClientErrorCode.TOO_MANY_REQUESTS) {
// TOO_MANY_REQUESTS 에러에 대한 처리 로직
log.info("Too many requests. Waiting for a while.");
SqsListenerControlService.requestStop();

} else { // 그 외의 경우에는 잘못된 형식이 온 것이므로 큐에서 제거.
String receiptHandle = message.getHeaders().get("Sqs_ReceiptHandle", String.class);
deleteMessage(receiptHandle);
}
} catch (Exception e) {
// 예외 처리
System.err.println("Failed to process message: " + e.getMessage());
log.warn("Failed to process message: " + e.getMessage());
SqsListenerControlService.requestStop();
log.info("SQS Listener has been stopped.");
//TODO: 스캐줄러 걸어서 특정 시간 이후에 켜주기
}
}

private void handleUserStreakRequest(UserStreakRequest request) {
System.out.println("Handling UserStreakRequest: " + request);
// 비즈니스 로직 처리
// profileService.getUserProfileByNameTag(request.getId(), request.getTag());
}
private void deleteMessage(String receiptHandle) {
// 메시지 삭제 요청
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl("mejai-renewal-sqs") // SQS 큐 URL
.receiptHandle(receiptHandle)
.build();

private void handleUserProfileRequest(UserProfileRequest request) {
System.out.println("Handling UserProfileRequest: " + request);
// 비즈니스 로직 처리
sqsAsyncClient.deleteMessage(deleteMessageRequest).whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to delete message: {}", exception.getMessage());
} else {
log.info("Message successfully deleted from the queue.");
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package mejai.mejaigg.messaging.sqs.listener;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
Expand All @@ -9,17 +11,23 @@
@Service
@RequiredArgsConstructor
@Slf4j
public class SqsListenerControlService {
private final SqsMessageListenerContainer<Object> sqsMessageListenerContainer;
public class SqsListenerControlService implements ApplicationContextAware {
private static ApplicationContext context;

public void stopListener() {
sqsMessageListenerContainer.stop();
log.info("SQS Listener has been stopped.");
@Override
public void setApplicationContext(ApplicationContext ctx) {
context = ctx;
}

public void startListener() {
sqsMessageListenerContainer.start();
log.info("SQS Listener has been started.");
public static void requestStop() {
SqsMessageListenerContainer<?> container = context.getBean(SqsMessageListenerContainer.class);
container.stop();
log.info("SQS Listener has been stopped due to an error during message processing.");
}

public static void requestStart() {
SqsMessageListenerContainer<?> container = context.getBean(SqsMessageListenerContainer.class);
container.start();
log.info("SQS Listener has been started.");
}
}
18 changes: 18 additions & 0 deletions src/main/java/mejai/mejaigg/scheduler/CronConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package mejai.mejaigg.scheduler;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import mejai.mejaigg.messaging.sqs.listener.SqsListenerControlService;

@Configuration
@EnableScheduling
@Component
public class CronConfig {
@Scheduled(cron = "0 * * * * *") // 1분마다 실행
public void test() {
SqsListenerControlService.requestStart();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import mejai.mejaigg.global.config.RiotProperties;
import mejai.mejaigg.rank.domain.Rank;
import mejai.mejaigg.rank.dto.RankDto;
import mejai.mejaigg.rank.repository.RankRepository;
import mejai.mejaigg.riot.dto.AccountDto;
import mejai.mejaigg.riot.dto.SummonerDto;
import mejai.mejaigg.riot.service.RiotService;
Expand All @@ -28,7 +27,6 @@ public class ProfileService {

private final RiotService riotService;
private final SummonerRepository summonerRepository;
private final RankRepository rankRepository;
private final RiotProperties riotProperties;

/**
Expand Down Expand Up @@ -94,12 +92,15 @@ private Summoner initializeSummonerData(String name, String tag) {
public UserProfileDto refreshUserProfileByNameTag(String name, String tag) {
Summoner summoner = summonerRepository.findBySummonerNameAndTagLineAllIgnoreCase(name, tag).orElse(null);
if (summoner == null) {
log.info("유저가 없어 초기화를 진행합니다.");
summoner = initializeSummonerData(name, tag);
} else {
if (summoner.getUpdatedAt().plusHours(2).isAfter(LocalDateTime.now()))
log.info("2시간이 지나지 않아 강제 업데이트를 할 수 없습니다.");
else
else {
log.info("2시간이 지나 강제 업데이트를 진행합니다.");
updateUserDetails(summoner);
}
}
UserProfileDto userProfileDto = new UserProfileDto();
userProfileDto.setBySummoner(summoner, riotProperties.getResourceUrl());
Expand Down

0 comments on commit eb6d614

Please sign in to comment.