diff --git a/README.md b/README.md index f67118e7..8c40e4ee 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ The following topics are going to be covered in this 1st stage (other stages top - Adding automated tests of microservices in isolation. - Adding semi-automated tests to a microservice landscape. -### System Boundary - μServices Landscape (Release 3) +### System Boundary - μServices Landscape (Release 4) ![System Boundary](docs/stage1/app_ms_landscape.png) @@ -69,7 +69,7 @@ I recommend that you work with your Java code using an IDE that supports the dev All that you want to do is just fire up your IDE **->** open or import the parent folder `springy-store-microservices` and everything will be ready for you. -## Playing With Spring Store Project +## Playing With Springy Store Project ### Cloning It @@ -156,29 +156,94 @@ All build commands and test suite for each microservice should run successfully, ``` ### Running Them All -Now it's the time to run all of them, and it's very simple just run the following *docker compose* commands: +#### Using RabbitMQ without the use of partitions +Now it's the time to run all of our reactive Microservices, and it's very simple just run the + following +`docker-compose` commands: ```bash mohamed.taman@DTLNV8 ~/springy-store-microservices λ docker-compose -p ssm up -d ``` -All the **services** and **databases** will run in parallel in detached mode (option `-d`), and their output will be printed to the console as the following: +All the **services**, **databases**, and **messaging service** will run in parallel in detach + mode + (option `-d`), and + command output will print to the console the following: ```bash Creating network "ssm_default" with the default driver -Creating ssm_mysql_1 ... done -Creating ssm_mongodb_1 ... done -Creating ssm_store_1 ... done +Creating ssm_mysql_1 ... done +Creating ssm_mongodb_1 ... done +Creating ssm_rabbitmq_1 ... done +Creating ssm_store_1 ... done Creating ssm_review_1 ... done Creating ssm_product_1 ... done Creating ssm_recommendation_1 ... done ``` ### Access Store APIs -You can manually test `Store Service` APIs through out its **Swagger** interface at the following +You can manually test `Store Service` APIs throughout its **Swagger** interface at the following URL [http://localhost:8080/swagger-ui.html](http://localhost:8080/swagger-ui.html). - +#### Access RabbitMQ +In browser point to this URL [http://localhost:5672/](http://localhost:5672/) `username: guest +` and `password: guest`, and you can see all **topics**, **DLQs**, **partitions**, and payload. + +1. For running 2 instances of each service and using _RabbitMQ with two partitions per topic_, use + the following + `docker-compose` command: + ```bash + mohamed.taman@DTLNV8 ~/springy-store-microservices + λ docker-compose -p ssm -f docker-compose-partitions.yml up -d + ``` + 1. To use _Kafka and Zookeeper with two partitions per topic_ run the following + command: + ```bash + mohamed.taman@DTLNV8 ~/springy-store-microservices + λ docker-compose -p ssm -f docker-compose-kafka.yml up -d + ``` + +#### Check All Services Health +From Store front Service we can check all the core services health, when you have all the + microservices up and running using Docker Compose, +```bash +mohamed.taman@DTLNV8 ~/springy-store-microservices +λ curl http://localhost:8080/actuator/health -s | jq . +``` +This will result in the following response: +```json +{ + "status":"UP", + "components":{ + "Core System Microservices":{ + "status":"UP", + "components":{ + "Product Service":{ + "status":"UP" + }, + "Recommendation Service":{ + "status":"UP" + }, + "Review Service":{ + "status":"UP" + } + } + }, + "diskSpace":{ + "status":"UP", + "details":{ + "total":255382777856, + "free":86618931200, + "threshold":10485760, + "exists":true + } + }, + "ping":{ + "status":"UP" + } + } +} +``` ### Testing Them All Now it's time to test all the application functionality as one part. To do so just run the following automation test script: @@ -188,7 +253,7 @@ mohamed.taman@DTLNV8 ~/springy-store-microservices λ ./test-em-all.sh ``` -The result should be something like this: +The result will look like this: ```bash Starting [Springy Store] full functionality testing.... @@ -227,10 +292,10 @@ Finally, to close the story, we will need to shut down Microservices manually se ```bash mohamed.taman@DTLNV8 ~/springy-store-microservices -λ docker-compose -p ssm down +λ docker-compose -p ssm down --remove-orphans ``` - And the output should be as the following: + And you should see output like the following: ```bash Stopping ssm_recommendation_1 ... done @@ -239,12 +304,14 @@ Stopping ssm_review_1 ... done Stopping ssm_mongodb_1 ... done Stopping ssm_store_1 ... done Stopping ssm_mysql_1 ... done +Stopping ssm_rabbitmq_1 ... done Removing ssm_recommendation_1 ... done Removing ssm_product_1 ... done Removing ssm_review_1 ... done Removing ssm_mongodb_1 ... done Removing ssm_store_1 ... done Removing ssm_mysql_1 ... done +Removing ssm_rabbitmq_1 ... done Removing network ssm_default ``` diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml new file mode 100644 index 00000000..73eb6249 --- /dev/null +++ b/docker-compose-kafka.yml @@ -0,0 +1,169 @@ +version: '3.7' ## Latest version works with Docker Engine release 18.06.0+ + +services: + ## Start - Product service definition + ### Instance 1 + product: + build: product-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mongodb + - kafka + ### Instance 2 + product-i1: + build: product-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mongodb + - kafka + ## End - Product service definition + + ## Start - Recommendation service definition + ### Instance 1 + recommendation: + build: recommendation-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mongodb + - kafka + ### Instance 2 + recommendation-i1: + build: recommendation-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mongodb + - kafka + ## End - Recommendation service definition + + ## Start - Review service definition + ### Instance 1 + review: + build: review-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mysql + - kafka + restart: on-failure + ### Instance 2 + review-i1: + build: review-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mysql + - kafka + restart: on-failure + ## End - Review service definition + + ## Start - Store service definition + store: + build: store-service + ports: + - "8080:8080" + environment: + - SPRING_PROFILES_ACTIVE=docker + - MANAGEMENT_HEALTH_RABBIT_ENABLED=false + - SPRING_CLOUD_STREAM_DEFAULTBINDER=kafka + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-PRODUCTS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-PRODUCTS_PRODUCER_PARTITION-COUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-RECOMMENDATIONS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-RECOMMENDATIONS_PRODUCER_PARTITION-COUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-REVIEWS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-REVIEWS_PRODUCER_PARTITION-COUNT=2 + depends_on: + - kafka + ## End - Store service definition + + ## Start - mongodb database definition + ### $ mongo + mongodb: + image: mongo:4.2.5-bionic + ports: + - "27017-27019:27017-27019" + healthcheck: + test: "mongo --eval 'db.stats().ok'" + interval: 10s + timeout: 10s + retries: 5 + start_period: 40s + restart: on-failure + ## End - mongodb database definition + + ## Start - MySql database definition + ### $ mysql -uroot -h127.0.0.1 -p + mysql: + image: mysql:8.0.19 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=rootpwd + - MYSQL_DATABASE=review-db + - MYSQL_USER=user + - MYSQL_PASSWORD=pwd + - MYSQL_ROOT_HOST=% + healthcheck: + test: "/usr/bin/mysql --user=user --password=pwd --execute \"SHOW DATABASES;\"" + interval: 10s + timeout: 5s + retries: 10 + restart: on-failure + ## End - MySql database definition + + ## Start - Kafka Messaging service + kafka: + image: wurstmeister/kafka:latest + ports: + - "9092:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=kafka + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + depends_on: + - zookeeper + restart: on-failure + ## End - Kafka Messaging service + + ## Start - Zookeeper (Kafka) cluster management service + zookeeper: + image: wurstmeister/zookeeper:latest + ports: + - "2181:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper + restart: on-failure + ## End - Zookeeper cluster management service \ No newline at end of file diff --git a/docker-compose-partitions.yml b/docker-compose-partitions.yml new file mode 100644 index 00000000..2851cb31 --- /dev/null +++ b/docker-compose-partitions.yml @@ -0,0 +1,145 @@ +version: '3.7' ## Latest version works with Docker Engine release 18.06.0+ + +services: + ## Start - Product service definition + ### Instance 1 + product: + build: product-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mongodb + - rabbitmq + ### Instance 2 + product-i1: + build: product-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mongodb + - rabbitmq + ## End - Product service definition + + ## Start - Recommendation service definition + ### Instance 1 + recommendation: + build: recommendation-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mongodb + - rabbitmq + ### Instance 2 + recommendation-i1: + build: recommendation-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mongodb + - rabbitmq + ## End - Recommendation service definition + + ## Start - Review service definition + ### Instance 1 + review: + build: review-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=0 + depends_on: + - mysql + - rabbitmq + restart: on-failure + ### Instance 2 + review-i1: + build: review-service + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_PARTITIONED=true + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCECOUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_INPUT_CONSUMER_INSTANCEINDEX=1 + depends_on: + - mysql + - rabbitmq + restart: on-failure + ## End - Review service definition + + ## Start - Store service definition + store: + build: store-service + ports: + - "8080:8080" + environment: + - SPRING_PROFILES_ACTIVE=docker + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-PRODUCTS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-PRODUCTS_PRODUCER_PARTITION-COUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-RECOMMENDATIONS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-RECOMMENDATIONS_PRODUCER_PARTITION-COUNT=2 + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-REVIEWS_PRODUCER_PARTITION-KEY-EXPRESSION=payload.key + - SPRING_CLOUD_STREAM_BINDINGS_OUTPUT-REVIEWS_PRODUCER_PARTITION-COUNT=2 + depends_on: + - rabbitmq + ## End - Store service definition + + ## Start - mongodb database definition + ### $ mongo + mongodb: + image: mongo:4.2.5-bionic + ports: + - "27017-27019:27017-27019" + healthcheck: + test: "mongo --eval 'db.stats().ok'" + interval: 10s + timeout: 10s + retries: 5 + start_period: 40s + restart: on-failure + ## End - mongodb database definition + + ## Start - MySql database definition + ### $ mysql -uroot -h127.0.0.1 -p + mysql: + image: mysql:8.0.19 + ports: + - "3306:3306" + environment: + - MYSQL_ROOT_PASSWORD=rootpwd + - MYSQL_DATABASE=review-db + - MYSQL_USER=user + - MYSQL_PASSWORD=pwd + - MYSQL_ROOT_HOST=% + healthcheck: + test: "/usr/bin/mysql --user=user --password=pwd --execute \"SHOW DATABASES;\"" + interval: 10s + timeout: 5s + retries: 10 + restart: on-failure + ## End - MySql database definition + + ## Start - RabbitMQ Messaging service + rabbitmq: + image: rabbitmq:3.8.3-management + ports: + - 5672:5672 + - 15672:15672 + healthcheck: + test: ["CMD", "rabbitmqctl", "status"] + interval: 10s + timeout: 5s + retries: 10 + restart: on-failure + ## End - RabbitMQ Messaging service diff --git a/docker-compose.yml b/docker-compose.yml index 1b69f94f..af37bd1c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,49 +1,65 @@ version: '3.7' ## Latest version works with Docker Engine release 18.06.0+ services: + ## Start - Product service definition product: build: product-service environment: - SPRING_PROFILES_ACTIVE=docker depends_on: - mongodb + - rabbitmq + ## End - Product service definition + ## Start - Recommendation service definition recommendation: build: recommendation-service environment: - SPRING_PROFILES_ACTIVE=docker depends_on: - mongodb + - rabbitmq + ## End - Recommendation service definition + ## Start - Review service definition review: build: review-service environment: - SPRING_PROFILES_ACTIVE=docker depends_on: - mysql + - rabbitmq restart: on-failure + ## End - Review service definition + ## Start - Store service definition store: build: store-service ports: - "8080:8080" environment: - SPRING_PROFILES_ACTIVE=docker + depends_on: + - rabbitmq + ## End - Store service definition - # $ mongo + ## Start - mongodb database definition + ### $ mongo mongodb: image: mongo:4.2.5-bionic ports: - "27017-27019:27017-27019" healthcheck: - test: echo 'db.runCommand("ping").ok' | mongo mongodb:27017/test --quiet 1 + test: "mongo --eval 'db.stats().ok'" interval: 10s timeout: 10s retries: 5 start_period: 40s - restart: always + restart: on-failure + ## End - mongodb database definition - # $ mysql -uroot -h127.0.0.1 -p + ## Start - MySql database definition + ### $ mysql -uroot -h127.0.0.1 -p mysql: image: mysql:8.0.19 ports: @@ -55,8 +71,23 @@ services: - MYSQL_PASSWORD=pwd - MYSQL_ROOT_HOST=% healthcheck: - test: ["CMD", "mysqladmin" ,"ping", "-uuser", "-ppwd", "-h", "localhost"] + test: "/usr/bin/mysql --user=user --password=pwd --execute \"SHOW DATABASES;\"" + interval: 10s + timeout: 5s + retries: 10 + restart: on-failure + ## End - MySql database definition + + ## Start - RabbitMQ Messaging service + rabbitmq: + image: rabbitmq:3.8.3-management + ports: + - 5672:5672 + - 15672:15672 + healthcheck: + test: ["CMD", "rabbitmqctl", "status"] interval: 10s timeout: 5s retries: 10 - restart: always + restart: on-failure + ## End - RabbitMQ Messaging service diff --git a/docs/stage1/app_ms_landscape.png b/docs/stage1/app_ms_landscape.png index cc328749..98452ac4 100644 Binary files a/docs/stage1/app_ms_landscape.png and b/docs/stage1/app_ms_landscape.png differ diff --git a/product-service/pom.xml b/product-service/pom.xml index 0ee5fff0..217958df 100644 --- a/product-service/pom.xml +++ b/product-service/pom.xml @@ -21,7 +21,7 @@ org.springframework.boot - spring-boot-starter-data-mongodb + spring-boot-starter-data-mongodb-reactive diff --git a/product-service/src/main/java/com/siriusxi/ms/store/ps/api/ProductController.java b/product-service/src/main/java/com/siriusxi/ms/store/ps/api/ProductController.java index 1e549f1b..fb0ecc83 100644 --- a/product-service/src/main/java/com/siriusxi/ms/store/ps/api/ProductController.java +++ b/product-service/src/main/java/com/siriusxi/ms/store/ps/api/ProductController.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; /** * Class ProductController is the implementation of the main Product Endpoint API @@ -14,7 +15,7 @@ * * @see ProductEndpoint * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RestController @@ -31,19 +32,7 @@ public ProductController(@Qualifier("ProductServiceImpl") ProductService prodSer /** {@inheritDoc} */ @Override - public Product getProduct(int id) { + public Mono getProduct(int id) { return prodService.getProduct(id); } - - /** {@inheritDoc} */ - @Override - public Product createProduct(Product body) { - return prodService.createProduct(body); - } - - /** {@inheritDoc} */ - @Override - public void deleteProduct(int id) { - prodService.deleteProduct(id); - } } diff --git a/product-service/src/main/java/com/siriusxi/ms/store/ps/infra/MessageProcessor.java b/product-service/src/main/java/com/siriusxi/ms/store/ps/infra/MessageProcessor.java new file mode 100644 index 00000000..6eb1f9dc --- /dev/null +++ b/product-service/src/main/java/com/siriusxi/ms/store/ps/infra/MessageProcessor.java @@ -0,0 +1,53 @@ +package com.siriusxi.ms.store.ps.infra; + +import com.siriusxi.ms.store.api.core.product.ProductService; +import com.siriusxi.ms.store.api.core.product.dto.Product; +import com.siriusxi.ms.store.api.event.Event; +import com.siriusxi.ms.store.util.exceptions.EventProcessingException; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +@EnableBinding(Sink.class) +@Log4j2 +public class MessageProcessor { + + private final ProductService productService; + + @Autowired + public MessageProcessor(@Qualifier("ProductServiceImpl") ProductService productService) { + this.productService = productService; + } + + @StreamListener(target = Sink.INPUT) + public void process(Event event) { + + log.info("Process message created at {}...", event.getEventCreatedAt()); + + switch (event.getEventType()) { + case CREATE -> { + Product product = event.getData(); + log.info("Create product with ID: {}", product.getProductId()); + productService.createProduct(product); + } + case DELETE -> { + log.info("Delete recommendations with Product Id: {}", event.getKey()); + productService.deleteProduct(event.getKey()); + } + default -> { + String errorMessage = + "Incorrect event type: " + .concat(event.getEventType().toString()) + .concat(", expected a CREATE or DELETE event."); + log.warn(errorMessage); + throw new EventProcessingException(errorMessage); + } + } + + log.info("Message processing done!"); + } + +} diff --git a/product-service/src/main/java/com/siriusxi/ms/store/ps/persistence/ProductRepository.java b/product-service/src/main/java/com/siriusxi/ms/store/ps/persistence/ProductRepository.java index 8b8e35b4..24c97717 100644 --- a/product-service/src/main/java/com/siriusxi/ms/store/ps/persistence/ProductRepository.java +++ b/product-service/src/main/java/com/siriusxi/ms/store/ps/persistence/ProductRepository.java @@ -1,12 +1,11 @@ package com.siriusxi.ms.store.ps.persistence; -import org.springframework.data.repository.PagingAndSortingRepository; +import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.stereotype.Repository; - -import java.util.Optional; +import reactor.core.publisher.Mono; @Repository -public interface ProductRepository extends PagingAndSortingRepository { +public interface ProductRepository extends ReactiveCrudRepository { - Optional findByProductId(int productId); + Mono findByProductId(int productId); } diff --git a/product-service/src/main/java/com/siriusxi/ms/store/ps/service/ProductServiceImpl.java b/product-service/src/main/java/com/siriusxi/ms/store/ps/service/ProductServiceImpl.java index 03dce467..c57681e9 100644 --- a/product-service/src/main/java/com/siriusxi/ms/store/ps/service/ProductServiceImpl.java +++ b/product-service/src/main/java/com/siriusxi/ms/store/ps/service/ProductServiceImpl.java @@ -2,7 +2,6 @@ import com.siriusxi.ms.store.api.core.product.ProductService; import com.siriusxi.ms.store.api.core.product.dto.Product; -import com.siriusxi.ms.store.ps.persistence.ProductEntity; import com.siriusxi.ms.store.ps.persistence.ProductRepository; import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import com.siriusxi.ms.store.util.exceptions.NotFoundException; @@ -11,6 +10,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import static reactor.core.publisher.Mono.error; @Service("ProductServiceImpl") @Log4j2 @@ -32,34 +34,33 @@ public ProductServiceImpl( @Override public Product createProduct(Product body) { - try { - ProductEntity entity = mapper.apiToEntity(body); - ProductEntity newEntity = repository.save(entity); - log.debug("createProduct: entity created for productId: {}", body.getProductId()); - return mapper.entityToApi(newEntity); + isValidProductId(body.getProductId()); - } catch (DuplicateKeyException dke) { - throw new InvalidInputException("Duplicate key, Product Id: " + body.getProductId()); - } + return repository + .save(mapper.apiToEntity(body)) + .log() + .onErrorMap( + DuplicateKeyException.class, + ex -> new InvalidInputException("Duplicate key, Product Id: " + body.getProductId())) + .map(mapper::entityToApi) + .block(); } @Override - public Product getProduct(int productId) { - if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); - - ProductEntity entity = - repository - .findByProductId(productId) - .orElseThrow( - () -> new NotFoundException("No product found for productId: " + productId)); + public Mono getProduct(int productId) { - Product response = mapper.entityToApi(entity); - response.setServiceAddress(serviceUtil.getServiceAddress()); + isValidProductId(productId); - log.debug("getProduct: found productId: {}", response.getProductId()); - - return response; + return repository + .findByProductId(productId) + .switchIfEmpty(error(new NotFoundException("No product found for productId: " + productId))) + .log() + .map(mapper::entityToApi) + .map(e -> { + e.setServiceAddress(serviceUtil.getServiceAddress()); + return e; + }); } /* @@ -68,7 +69,21 @@ public Product getProduct(int productId) { */ @Override public void deleteProduct(int productId) { + + isValidProductId(productId); + log.debug("deleteProduct: tries to delete an entity with productId: {}", productId); - repository.findByProductId(productId).ifPresent(repository::delete); + + repository + .findByProductId(productId) + .log() + .map(repository::delete) + .flatMap(e -> e) + .block(); + } + + // TODO Cloud be added to utilities class to be used by all core services implementations. + private void isValidProductId(int productId) { + if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); } } diff --git a/product-service/src/main/resources/application.yaml b/product-service/src/main/resources/application.yaml index 3e6b6175..065c913a 100644 --- a/product-service/src/main/resources/application.yaml +++ b/product-service/src/main/resources/application.yaml @@ -7,6 +7,39 @@ spring: port: 27017 database: product-db auto-index-creation: true + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: guest + password: guest + cloud: + stream: + defaultBinder: rabbit + default: + contentType: application/json + bindings: + input: + destination: products + group: productsGroup + consumer: + maxAttempts: 3 + backOffInitialInterval: 500 + backOffMaxInterval: 1000 + backOffMultiplier: 2.0 + rabbit: + bindings: + input: + consumer: + autoBindDlq: true + republishToDlq: true + kafka: + bindings: + input: + consumer: + enableDlq: true + binder: + brokers: 127.0.0.1 + defaultBrokerPort: 9092 server: port: 9081 @@ -16,9 +49,14 @@ logging: web: DEBUG root: INFO com.siriusxi.ms.store: DEBUG - org.springframework.data.mongodb.core.MongoTemplate: DEBUG + org: + springframework.data.mongodb.core.MongoTemplate: DEBUG + mongodb: debug management: + info: + git: + mode: full endpoints: web: exposure: @@ -26,6 +64,8 @@ management: endpoint: shutdown: enabled: true + health: + show-details: always # This is a docker specific profile properties # Also profiles could be separated in its owen file @@ -38,6 +78,13 @@ spring: data: mongodb: host: mongodb + rabbitmq: + host: rabbitmq + cloud: + stream: + kafka: + binder: + brokers: kafka server: port: 8080 diff --git a/product-service/src/test/java/com/siriusxi/ms/store/ps/MapperTests.java b/product-service/src/test/java/com/siriusxi/ms/store/ps/MapperTests.java index c8e7ea51..dd91971a 100644 --- a/product-service/src/test/java/com/siriusxi/ms/store/ps/MapperTests.java +++ b/product-service/src/test/java/com/siriusxi/ms/store/ps/MapperTests.java @@ -7,7 +7,7 @@ import static org.junit.jupiter.api.Assertions.*; -public class MapperTests { +class MapperTests { private final ProductMapper mapper = ProductMapper.INSTANCE; diff --git a/product-service/src/test/java/com/siriusxi/ms/store/ps/PersistenceTests.java b/product-service/src/test/java/com/siriusxi/ms/store/ps/PersistenceTests.java index 650b9148..b01257c4 100644 --- a/product-service/src/test/java/com/siriusxi/ms/store/ps/PersistenceTests.java +++ b/product-service/src/test/java/com/siriusxi/ms/store/ps/PersistenceTests.java @@ -2,27 +2,16 @@ import com.siriusxi.ms.store.ps.persistence.ProductEntity; import com.siriusxi.ms.store.ps.persistence.ProductRepository; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.OptimisticLockingFailureException; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; - -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static java.util.stream.IntStream.rangeClosed; -import static org.junit.jupiter.api.Assertions.*; -import static org.springframework.data.domain.Sort.Direction.ASC; +import reactor.test.StepVerifier; @DataMongoTest -public class PersistenceTests { +class PersistenceTests { @Autowired private ProductRepository repository; @@ -30,125 +19,110 @@ public class PersistenceTests { @BeforeEach public void setupDb() { - repository.deleteAll(); + + StepVerifier.create(repository.deleteAll()).verifyComplete(); ProductEntity entity = new ProductEntity(1, "n", 1); - savedEntity = repository.save(entity); - assertEqualsProduct(entity, savedEntity); + StepVerifier.create(repository.save(entity)) + .expectNextMatches( + createdEntity -> { + savedEntity = createdEntity; + return areProductEqual(entity, savedEntity); + }) + .verifyComplete(); } @Test public void create() { var newEntity = new ProductEntity(2, "n", 2); - repository.save(newEntity); - var foundEntity = repository.findById(newEntity.getId()).orElse(new ProductEntity()); - assertEqualsProduct(newEntity, foundEntity); + StepVerifier.create(repository.save(newEntity)) + .expectNextMatches( + createdEntity -> newEntity.getProductId() == createdEntity.getProductId()) + .verifyComplete(); + + StepVerifier.create(repository.findById(newEntity.getId())) + .expectNextMatches(foundEntity -> areProductEqual(newEntity, foundEntity)) + .verifyComplete(); - assertEquals(2, repository.count()); + StepVerifier.create(repository.count()).expectNext(2L).verifyComplete(); } @Test public void update() { savedEntity.setName("n2"); - repository.save(savedEntity); - var foundEntity = repository.findById(savedEntity.getId()).orElse(new ProductEntity()); - assertEquals(1, (long) foundEntity.getVersion()); - assertEquals("n2", foundEntity.getName()); + StepVerifier.create(repository.save(savedEntity)) + .expectNextMatches(savedEntity -> savedEntity.getName().equals("n2")) + .verifyComplete(); + + StepVerifier.create(repository.findById(savedEntity.getId())) + .expectNextMatches( + foundEntity -> foundEntity.getVersion().equals(1) && foundEntity.getName().equals("n2")) + .verifyComplete(); } @Test public void delete() { - repository.delete(savedEntity); - assertFalse(repository.existsById(savedEntity.getId())); + StepVerifier.create(repository.delete(savedEntity)).verifyComplete(); + + StepVerifier.create(repository.existsById(savedEntity.getId())) + .expectNext(false) + .verifyComplete(); } @Test public void getByProductId() { - Optional entity = repository.findByProductId(savedEntity.getProductId()); - assertTrue(entity.isPresent()); - assertEqualsProduct(savedEntity, entity.get()); + StepVerifier.create(repository.findByProductId(savedEntity.getProductId())) + .expectNextMatches(foundEntity -> areProductEqual(savedEntity, foundEntity)) + .verifyComplete(); } @Test public void duplicateError() { - - Assertions.assertThrows( - DuplicateKeyException.class, - () -> { - ProductEntity entity = new ProductEntity(savedEntity.getProductId(), "n", 1); - repository.save(entity); - }); + StepVerifier.create(repository.save(new ProductEntity(savedEntity.getProductId(), "n", 1))) + .expectError(DuplicateKeyException.class) + .verify(); } @Test public void optimisticLockError() { // Store the saved entity in two separate entity objects - ProductEntity entity1 = repository.findById(savedEntity.getId()).orElse(new ProductEntity()), - entity2 = repository.findById(savedEntity.getId()).orElse(new ProductEntity()); + ProductEntity entity1 = repository.findById(savedEntity.getId()).block(), + entity2 = repository.findById(savedEntity.getId()).block(); // Update the entity using the first entity object + assert entity1 != null; entity1.setName("n1"); - repository.save(entity1); + repository.save(entity1).block(); /* Update the entity using the second entity object. This should fail since the second entity now holds a old version number, i.e. a Optimistic Lock Error. */ - try { - entity2.setName("n2"); - repository.save(entity2); + assert entity2 != null; - fail("Expected an OptimisticLockingFailureException"); - } catch (OptimisticLockingFailureException ignored) { } + StepVerifier.create(repository.save(entity2)) + .expectError(OptimisticLockingFailureException.class) + .verify(); // Get the updated entity from the database and verify its new sate - var updatedEntity = repository.findById(savedEntity.getId()).orElse(new ProductEntity()); - assertEquals(1, (int) updatedEntity.getVersion()); - assertEquals("n1", updatedEntity.getName()); - } - - @Test - public void paging() { - - repository.deleteAll(); - - List newProducts = - rangeClosed(1001, 1010) - .mapToObj(i -> new ProductEntity(i, "name " + i, i)) - .collect(Collectors.toList()); - repository.saveAll(newProducts); - - Pageable nextPage = PageRequest.of(0, 4, ASC, "productId"); - nextPage = testNextPage(nextPage, "[1001, 1002, 1003, 1004]", true); - nextPage = testNextPage(nextPage, "[1005, 1006, 1007, 1008]", true); - testNextPage(nextPage, "[1009, 1010]", false); - } - - private Pageable testNextPage( - Pageable nextPage, String expectedProductIds, boolean expectsNextPage) { - Page productPage = repository.findAll(nextPage); - assertEquals( - expectedProductIds, - productPage.getContent().stream() - .map(ProductEntity::getProductId) - .collect(Collectors.toList()) - .toString()); - assertEquals(expectsNextPage, productPage.hasNext()); - return productPage.nextPageable(); + StepVerifier.create(repository.findById(savedEntity.getId())) + .expectNextMatches( + foundEntity -> foundEntity.getVersion() == 1 && foundEntity.getName().equals("n1")) + .verifyComplete(); } - private void assertEqualsProduct(ProductEntity expectedEntity, ProductEntity actualEntity) { - assertEquals(expectedEntity.getId(), actualEntity.getId()); - assertEquals(expectedEntity.getVersion(), actualEntity.getVersion()); - assertEquals(expectedEntity.getProductId(), actualEntity.getProductId()); - assertEquals(expectedEntity.getName(), actualEntity.getName()); - assertEquals(expectedEntity.getWeight(), actualEntity.getWeight()); + private boolean areProductEqual(ProductEntity expectedEntity, ProductEntity actualEntity) { + return (expectedEntity.getId().equals(actualEntity.getId())) + && (expectedEntity.getVersion().equals(actualEntity.getVersion())) + && (expectedEntity.getProductId() == actualEntity.getProductId()) + && (expectedEntity.getName().equals(actualEntity.getName())) + && (expectedEntity.getWeight() == actualEntity.getWeight()); } } diff --git a/product-service/src/test/java/com/siriusxi/ms/store/ps/ProductServiceApplicationTests.java b/product-service/src/test/java/com/siriusxi/ms/store/ps/ProductServiceApplicationTests.java index e8c7af7e..15c80139 100644 --- a/product-service/src/test/java/com/siriusxi/ms/store/ps/ProductServiceApplicationTests.java +++ b/product-service/src/test/java/com/siriusxi/ms/store/ps/ProductServiceApplicationTests.java @@ -1,17 +1,23 @@ package com.siriusxi.ms.store.ps; import com.siriusxi.ms.store.api.core.product.dto.Product; +import com.siriusxi.ms.store.api.event.Event; import com.siriusxi.ms.store.ps.persistence.ProductRepository; +import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Sink; import org.springframework.http.HttpStatus; +import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.GenericMessage; import org.springframework.test.web.reactive.server.WebTestClient; -import reactor.core.publisher.Mono; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static org.junit.jupiter.api.Assertions.*; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import static org.springframework.http.HttpStatus.*; import static org.springframework.http.MediaType.APPLICATION_JSON; @@ -23,13 +29,21 @@ class ProductServiceApplicationTests { private final String BASE_URI = "/products/"; - @Autowired private WebTestClient client; + @Autowired + private WebTestClient client; - @Autowired private ProductRepository repository; + @Autowired + private ProductRepository repository; + + @Autowired + private Sink channels; + + private AbstractMessageChannel input = null; @BeforeEach public void setupDb() { - repository.deleteAll(); + input = (AbstractMessageChannel) channels.input(); + repository.deleteAll().block(); } @Test @@ -37,11 +51,16 @@ public void getProductById() { int productId = 1; - postAndVerifyProduct(productId, OK); + assertNull(repository.findByProductId(productId).block()); + assertEquals(0, repository.count().block()); - assertTrue(repository.findByProductId(productId).isPresent()); + sendCreateProductEvent(productId); - getAndVerifyProduct(productId, OK).jsonPath("$.productId").isEqualTo(productId); + assertNotNull(repository.findByProductId(productId).block()); + assertEquals(1, repository.count().block()); + + getAndVerifyProduct(productId, OK) + .jsonPath("$.productId").isEqualTo(productId); } @Test @@ -49,15 +68,22 @@ public void duplicateError() { int productId = 1; - postAndVerifyProduct(productId, OK); + assertNull(repository.findByProductId(productId).block()); - assertTrue(repository.findByProductId(productId).isPresent()); + sendCreateProductEvent(productId); - postAndVerifyProduct(productId, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Duplicate key, Product Id: " + productId); + assertNotNull(repository.findByProductId(productId).block()); + + try { + sendCreateProductEvent(productId); + fail("Expected a MessagingException here!"); + } catch (MessagingException me) { + if (me.getCause() instanceof InvalidInputException iie) { + assertEquals("Duplicate key, Product Id: " + productId, iie.getMessage()); + } else { + fail("Expected a InvalidInputException as the root cause!"); + } + } } @Test @@ -65,23 +91,19 @@ public void deleteProduct() { int productId = 1; - postAndVerifyProduct(productId, OK); - assertTrue(repository.findByProductId(productId).isPresent()); - - deleteAndVerifyProduct(productId); - assertFalse(repository.findByProductId(productId).isPresent()); + sendCreateProductEvent(productId); + assertNotNull(repository.findByProductId(productId).block()); - deleteAndVerifyProduct(productId); + sendDeleteProductEvent(productId); + assertNull(repository.findByProductId(productId).block()); } @Test public void getProductInvalidParameterString() { getAndVerifyProduct(BASE_URI + "/no-integer", BAD_REQUEST) - .jsonPath("$.path") - .isEqualTo(BASE_URI + "no-integer") - .jsonPath("$.message") - .isEqualTo("Type mismatch."); + .jsonPath("$.path").isEqualTo(BASE_URI + "no-integer") + .jsonPath("$.message").isEqualTo("Type mismatch."); } @Test @@ -89,10 +111,9 @@ public void getProductNotFound() { int productIdNotFound = 13; getAndVerifyProduct(productIdNotFound, NOT_FOUND) - .jsonPath("$.path") - .isEqualTo(BASE_URI + productIdNotFound) + .jsonPath("$.path").isEqualTo(BASE_URI + productIdNotFound) .jsonPath("$.message") - .isEqualTo("No product found for productId: " + productIdNotFound); + .isEqualTo("No product found for productId: " + productIdNotFound); } @Test @@ -101,10 +122,8 @@ public void getProductInvalidParameterNegativeValue() { int productIdInvalid = -1; getAndVerifyProduct(productIdInvalid, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI + productIdInvalid) - .jsonPath("$.message") - .isEqualTo("Invalid productId: " + productIdInvalid); + .jsonPath("$.path").isEqualTo(BASE_URI + productIdInvalid) + .jsonPath("$.message").isEqualTo("Invalid productId: " + productIdInvalid); } private WebTestClient.BodyContentSpec getAndVerifyProduct( @@ -119,37 +138,19 @@ private WebTestClient.BodyContentSpec getAndVerifyProduct( .uri(productIdPath) .accept(APPLICATION_JSON) .exchange() - .expectStatus() - .isEqualTo(expectedStatus) - .expectHeader() - .contentType(APPLICATION_JSON) + .expectStatus().isEqualTo(expectedStatus) + .expectHeader().contentType(APPLICATION_JSON) .expectBody(); } - private WebTestClient.BodyContentSpec postAndVerifyProduct( - int productId, HttpStatus expectedStatus) { + private void sendCreateProductEvent(int productId) { Product product = new Product(productId, "Name " + productId, productId, "SA"); - return client - .post() - .uri(BASE_URI) - .body(Mono.just(product), Product.class) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(expectedStatus) - .expectHeader() - .contentType(APPLICATION_JSON) - .expectBody(); + Event event = new Event<>(CREATE, productId, product); + input.send(new GenericMessage<>(event)); } - private void deleteAndVerifyProduct(int productId) { - client - .delete() - .uri(BASE_URI + productId) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(OK) - .expectBody(); + private void sendDeleteProductEvent(int productId) { + Event event = new Event<>(DELETE, productId, null); + input.send(new GenericMessage<>(event)); } } diff --git a/recommendation-service/pom.xml b/recommendation-service/pom.xml index 8dcc2a4a..a18c5c02 100644 --- a/recommendation-service/pom.xml +++ b/recommendation-service/pom.xml @@ -21,7 +21,7 @@ org.springframework.boot - spring-boot-starter-data-mongodb + spring-boot-starter-data-mongodb-reactive diff --git a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/api/RecommendationController.java b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/api/RecommendationController.java index a96f96a1..191e43d8 100644 --- a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/api/RecommendationController.java +++ b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/api/RecommendationController.java @@ -7,8 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RestController; - -import java.util.List; +import reactor.core.publisher.Flux; /** * Class RecommendationController is the implementation of the main Recommendation @@ -16,12 +15,13 @@ * * @see RecommendationEndpoint * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RestController @Log4j2 public class RecommendationController implements RecommendationEndpoint { + /** Recommendation service business logic interface. */ private final RecommendationService recommendationService; @@ -33,19 +33,7 @@ public RecommendationController( /** {@inheritDoc} */ @Override - public List getRecommendations(int productId) { + public Flux getRecommendations(int productId) { return recommendationService.getRecommendations(productId); } - - /** {@inheritDoc} */ - @Override - public Recommendation createRecommendation(Recommendation body) { - return recommendationService.createRecommendation(body); - } - - /** {@inheritDoc} */ - @Override - public void deleteRecommendations(int productId) { - recommendationService.deleteRecommendations(productId); - } } diff --git a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/infra/MessageProcessor.java b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/infra/MessageProcessor.java new file mode 100644 index 00000000..6000edfa --- /dev/null +++ b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/infra/MessageProcessor.java @@ -0,0 +1,57 @@ +package com.siriusxi.ms.store.rs.infra; + +import com.siriusxi.ms.store.api.core.recommendation.RecommendationService; +import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; +import com.siriusxi.ms.store.api.event.Event; +import com.siriusxi.ms.store.util.exceptions.EventProcessingException; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +import static java.lang.String.*; + +@EnableBinding(Sink.class) +@Log4j2 +public class MessageProcessor { + + private final RecommendationService service; + + @Autowired + public MessageProcessor(@Qualifier("RecommendationServiceImpl") RecommendationService service) { + this.service = service; + } + + @StreamListener(target = Sink.INPUT) + public void process(Event event) { + + log.info("Process message created at {}...", event.getEventCreatedAt()); + + switch (event.getEventType()) { + case CREATE -> { + Recommendation recommendation = event.getData(); + log.info("Create recommendation with ID: {}/{}", recommendation.getProductId(), + recommendation.getRecommendationId()); + service.createRecommendation(recommendation); + } + case DELETE -> { + int productId = event.getKey(); + log.info("Delete recommendations with ProductID: {}", productId); + service.deleteRecommendations(productId); + } + default -> { + String errorMessage = + "Incorrect event type: " + .concat(valueOf(event.getEventType())) + .concat(", expected a CREATE or DELETE event"); + log.warn(errorMessage); + throw new EventProcessingException(errorMessage); + } + } + + log.info("Message processing done!"); + } + +} diff --git a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationEntity.java b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationEntity.java index eafeefb3..dfc33662 100644 --- a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationEntity.java +++ b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationEntity.java @@ -7,6 +7,8 @@ import org.springframework.data.mongodb.core.index.CompoundIndex; import org.springframework.data.mongodb.core.mapping.Document; +import static java.lang.String.format; + @Document(collection = "recommendations") @CompoundIndex( name = "prod-rec-id", diff --git a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationRepository.java b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationRepository.java index 520f0e90..4f8bf598 100644 --- a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationRepository.java +++ b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/persistence/RecommendationRepository.java @@ -1,11 +1,14 @@ package com.siriusxi.ms.store.rs.persistence; -import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.stereotype.Repository; +import reactor.core.publisher.Flux; -import java.util.List; +//FIXME to be documented and to be pageable @Repository -public interface RecommendationRepository extends CrudRepository { - List findByProductId(int productId); +public interface RecommendationRepository + extends ReactiveCrudRepository { + + Flux findByProductId(int productId); } diff --git a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/service/RecommendationServiceImpl.java b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/service/RecommendationServiceImpl.java index 3cabadde..55d2dc83 100644 --- a/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/service/RecommendationServiceImpl.java +++ b/recommendation-service/src/main/java/com/siriusxi/ms/store/rs/service/RecommendationServiceImpl.java @@ -2,7 +2,6 @@ import com.siriusxi.ms.store.api.core.recommendation.RecommendationService; import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; -import com.siriusxi.ms.store.rs.persistence.RecommendationEntity; import com.siriusxi.ms.store.rs.persistence.RecommendationRepository; import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import com.siriusxi.ms.store.util.http.ServiceUtil; @@ -10,8 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; - -import java.util.List; +import reactor.core.publisher.Flux; @Service("RecommendationServiceImpl") @Log4j2 @@ -33,46 +31,55 @@ public RecommendationServiceImpl( @Override public Recommendation createRecommendation(Recommendation body) { - try { - RecommendationEntity entity = mapper.apiToEntity(body); - RecommendationEntity newEntity = repository.save(entity); - - log.debug( - "createRecommendation: created a recommendation entity: {}/{}", - body.getProductId(), - body.getRecommendationId()); - - return mapper.entityToApi(newEntity); - - } catch (DuplicateKeyException dke) { - throw new InvalidInputException( - "Duplicate key, Product Id: " - + body.getProductId() - + ", Recommendation Id:" - + body.getRecommendationId()); - } + + isValidProductId(body.getProductId()); + + return repository + .save(mapper.apiToEntity(body)) + .log() + .onErrorMap( + DuplicateKeyException.class, + ex -> new InvalidInputException("Duplicate key, Product Id: " + + body.getProductId() + ", Recommendation Id:" + + body.getRecommendationId())) + .map(mapper::entityToApi).block(); } @Override - public List getRecommendations(int productId) { + public Flux getRecommendations(int productId) { - if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); + isValidProductId(productId); - List entityList = repository.findByProductId(productId); - List list = mapper.entityListToApiList(entityList); - list.forEach(e -> e.setServiceAddress(serviceUtil.getServiceAddress())); + return repository + .findByProductId(productId) + .log() + .map(mapper::entityToApi) + .map(e -> { + e.setServiceAddress(serviceUtil.getServiceAddress()); + return e; + }); - log.debug("getRecommendations: response size: {}", list.size()); - - return list; + //FIXME check how to add log to flux + //log.debug("getRecommendations: response size: {}", list.size()); } @Override public void deleteRecommendations(int productId) { + isValidProductId(productId); + log.debug( - "deleteRecommendations: tries to delete recommendations for the product with " - + "productId: {}", + """ + deleteRecommendations: tries to delete recommendations + for the product with productId: {} + """, productId); - repository.deleteAll(repository.findByProductId(productId)); + + repository + .deleteAll(repository.findByProductId(productId)) + .block(); + } + + private void isValidProductId(int productId) { + if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); } } diff --git a/recommendation-service/src/main/resources/application.yaml b/recommendation-service/src/main/resources/application.yaml index 02460b21..a71f35ba 100644 --- a/recommendation-service/src/main/resources/application.yaml +++ b/recommendation-service/src/main/resources/application.yaml @@ -7,6 +7,39 @@ spring: port: 27017 database: recommendation-db auto-index-creation: true + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: guest + password: guest + cloud: + stream: + defaultBinder: rabbit + default: + contentType: application/json + bindings: + input: + destination: recommendations + group: recommendationsGroup + consumer: + maxAttempts: 3 + backOffInitialInterval: 500 + backOffMaxInterval: 1000 + backOffMultiplier: 2.0 + rabbit: + bindings: + input: + consumer: + autoBindDlq: true + republishToDlq: true + kafka: + bindings: + input: + consumer: + enableDlq: true + binder: + brokers: 127.0.0.1 + defaultBrokerPort: 9092 server: port: 9082 @@ -21,6 +54,9 @@ logging: mongodb: debug management: + info: + git: + mode: full endpoints: web: exposure: @@ -28,6 +64,8 @@ management: endpoint: shutdown: enabled: true + health: + show-details: always # This is a docker specific profile properties # Also profiles could be separated in its owen file @@ -40,6 +78,13 @@ spring: data: mongodb: host: mongodb + rabbitmq: + host: rabbitmq + cloud: + stream: + kafka: + binder: + brokers: kafka server: port: 8080 diff --git a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/MapperTests.java b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/MapperTests.java index 3642918b..a2e2952d 100644 --- a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/MapperTests.java +++ b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/MapperTests.java @@ -10,7 +10,7 @@ import static org.junit.jupiter.api.Assertions.*; -public class MapperTests { +class MapperTests { private final RecommendationMapper mapper = RecommendationMapper.INSTANCE; diff --git a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/PersistenceTests.java b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/PersistenceTests.java index f42eb5e8..46e4f4ac 100644 --- a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/PersistenceTests.java +++ b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/PersistenceTests.java @@ -17,7 +17,7 @@ import static org.junit.jupiter.api.Assertions.*; @DataMongoTest -public class PersistenceTests { +class PersistenceTests { @Autowired private RecommendationRepository repository; @@ -25,11 +25,12 @@ public class PersistenceTests { @BeforeEach public void setupDb() { - repository.deleteAll(); + repository.deleteAll().block(); RecommendationEntity entity = new RecommendationEntity(1, 2, "a", 3, "c"); - savedEntity = repository.save(entity); + savedEntity = repository.save(entity).block(); + assert savedEntity != null; assertEqualsRecommendation(entity, savedEntity); } @@ -37,33 +38,39 @@ public void setupDb() { public void create() { var newEntity = new RecommendationEntity(1, 3, "a", 3, "c"); - repository.save(newEntity); + repository.save(newEntity).block(); - var foundEntity = repository.findById(newEntity.getId()).orElse(new RecommendationEntity()); + var foundEntity = repository.findById(newEntity.getId()).block(); + + assert foundEntity != null; assertEqualsRecommendation(newEntity, foundEntity); - assertEquals(2, repository.count()); + assertEquals(2L, repository.count().block()); } @Test public void update() { savedEntity.setAuthor("a2"); - repository.save(savedEntity); + repository.save(savedEntity).block(); + + RecommendationEntity foundEntity = repository.findById(savedEntity.getId()).block(); + + assert foundEntity != null; - RecommendationEntity foundEntity = repository.findById(savedEntity.getId()).get(); - assertEquals(1, (long) foundEntity.getVersion()); + assertEquals(1, foundEntity.getVersion()); assertEquals("a2", foundEntity.getAuthor()); } @Test public void delete() { - repository.delete(savedEntity); - assertFalse(repository.existsById(savedEntity.getId())); + repository.delete(savedEntity).block(); + assertEquals(false, repository.existsById(savedEntity.getId()).block()); } @Test public void getByProductId() { - List entityList = repository.findByProductId(savedEntity.getProductId()); + List entityList = + repository.findByProductId(savedEntity.getProductId()).collectList().block(); assertThat(entityList, hasSize(1)); assertEqualsRecommendation(savedEntity, entityList.get(0)); @@ -74,20 +81,23 @@ public void duplicateError() { Assertions.assertThrows( DuplicateKeyException.class, - () -> repository.save(new RecommendationEntity(1, 2, "a", 3, "c"))); + () -> repository + .save(new RecommendationEntity(1, 2, "a", 3, "c")) + .block()); } @Test public void optimisticLockError() { // Store the saved entity in two separate entity objects - RecommendationEntity - entity1 = repository.findById(savedEntity.getId()).orElse(new RecommendationEntity()), - entity2 = repository.findById(savedEntity.getId()).orElse(new RecommendationEntity()); + RecommendationEntity entity1 = repository.findById(savedEntity.getId()).block(), + entity2 = repository.findById(savedEntity.getId()).block(); // Update the entity using the first entity object + assert entity1 != null; + entity1.setAuthor("a1"); - repository.save(entity1); + repository.save(entity1).block(); /* Update the entity using the second entity object. @@ -95,17 +105,21 @@ public void optimisticLockError() { i.e. a Optimistic Lock Error. */ try { + assert entity2 != null; + entity2.setAuthor("a2"); - repository.save(entity2); + repository.save(entity2).block(); fail("Expected an OptimisticLockingFailureException"); } catch (OptimisticLockingFailureException ignored) { } // Get the updated entity from the database and verify its new sate - var updatedEntity = repository.findById(savedEntity.getId()).orElse(new RecommendationEntity()); + var updatedEntity = repository.findById(savedEntity.getId()).block(); + + assert updatedEntity != null; - assertEquals(1, (int) updatedEntity.getVersion()); + assertEquals(1, updatedEntity.getVersion()); assertEquals("a1", updatedEntity.getAuthor()); } diff --git a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/RecommendationServiceApplicationTests.java b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/RecommendationServiceApplicationTests.java index 53059e8b..8461b2dc 100644 --- a/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/RecommendationServiceApplicationTests.java +++ b/recommendation-service/src/test/java/com/siriusxi/ms/store/rs/RecommendationServiceApplicationTests.java @@ -1,16 +1,25 @@ package com.siriusxi.ms.store.rs; import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; +import com.siriusxi.ms.store.api.event.Event; import com.siriusxi.ms.store.rs.persistence.RecommendationRepository; +import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Sink; import org.springframework.http.HttpStatus; +import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.GenericMessage; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient.BodyContentSpec; import reactor.core.publisher.Mono; +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static org.assertj.core.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import static org.springframework.http.HttpStatus.*; @@ -22,13 +31,20 @@ class RecommendationServiceApplicationTests { private final String BASE_URI = "/recommendations"; + @Autowired private WebTestClient client; @Autowired private RecommendationRepository repository; + @Autowired + private Sink channels; + + private AbstractMessageChannel input = null; + @BeforeEach public void setupDb() { - repository.deleteAll(); + input = (AbstractMessageChannel) channels.input(); + repository.deleteAll().block(); } @Test @@ -36,19 +52,16 @@ public void getRecommendationsByProductId() { int productId = 1; - postAndVerifyRecommendation(productId, 1, OK); - postAndVerifyRecommendation(productId, 2, OK); - postAndVerifyRecommendation(productId, 3, OK); + sendCreateRecommendationEvent(productId, 1); + sendCreateRecommendationEvent(productId, 2); + sendCreateRecommendationEvent(productId, 3); - assertEquals(3, repository.findByProductId(productId).size()); + assertEquals(3, repository.findByProductId(productId).count().block()); - getAndVerifyRecommendationsByProductId(productId, OK) - .jsonPath("$.length()") - .isEqualTo(3) - .jsonPath("$[2].productId") - .isEqualTo(productId) - .jsonPath("$[2].recommendationId") - .isEqualTo(3); + getAndVerifyRecommendationsByProductId(productId) + .jsonPath("$.length()").isEqualTo(3) + .jsonPath("$[2].productId").isEqualTo(productId) + .jsonPath("$[2].recommendationId").isEqualTo(3); } @Test @@ -57,21 +70,22 @@ public void duplicateError() { int productId = 1; int recommendationId = 1; - postAndVerifyRecommendation(productId, recommendationId, OK) - .jsonPath("$.productId") - .isEqualTo(productId) - .jsonPath("$.recommendationId") - .isEqualTo(recommendationId); + sendCreateRecommendationEvent(productId, recommendationId); - assertEquals(1, repository.count()); + assertEquals(1, repository.count().block()); - postAndVerifyRecommendation(productId, recommendationId, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Duplicate key, Product Id: 1, Recommendation Id:1"); + try { + sendCreateRecommendationEvent(productId, recommendationId); + fail("Expected a MessagingException here!"); + } catch (MessagingException me) { + if (me.getCause() instanceof InvalidInputException iie) { + assertEquals("Duplicate key, Product Id: 1, Recommendation Id:1", iie.getMessage()); + } else { + fail("Expected a InvalidInputException as the root cause!"); + } + } - assertEquals(1, repository.count()); + assertEquals(1, repository.count().block()); } @Test @@ -80,41 +94,35 @@ public void deleteRecommendations() { int productId = 1; int recommendationId = 1; - postAndVerifyRecommendation(productId, recommendationId, OK); - assertEquals(1, repository.findByProductId(productId).size()); - - deleteAndVerifyRecommendationsByProductIdIsOk(productId); - assertEquals(0, repository.findByProductId(productId).size()); + sendCreateRecommendationEvent(productId, recommendationId); + assertEquals(1, repository.findByProductId(productId).count().block()); - deleteAndVerifyRecommendationsByProductIdIsOk(productId); + sendDeleteRecommendationEvent(productId); + assertEquals(0, repository.findByProductId(productId).count().block()); } @Test public void getRecommendationsMissingParameter() { getAndVerifyRecommendationsByProductId("", BAD_REQUEST) - .jsonPath("$.path") - .isEqualTo(BASE_URI) + .jsonPath("$.path").isEqualTo(BASE_URI) .jsonPath("$.message") - .isEqualTo("Required int parameter 'productId' is not present"); + .isEqualTo("Required int parameter 'productId' is not present"); } @Test public void getRecommendationsInvalidParameter() { getAndVerifyRecommendationsByProductId("?productId=no-integer", BAD_REQUEST) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Type mismatch."); + .jsonPath("$.path").isEqualTo(BASE_URI) + .jsonPath("$.message").isEqualTo("Type mismatch."); } @Test public void getRecommendationsNotFound() { getAndVerifyRecommendationsByProductId("?productId=113", OK) - .jsonPath("$.length()") - .isEqualTo(0); + .jsonPath("$.length()").isEqualTo(0); } @Test @@ -122,16 +130,15 @@ public void getRecommendationsInvalidParameterNegativeValue() { int productIdInvalid = -1; - getAndVerifyRecommendationsByProductId("?productId=" + productIdInvalid, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Invalid productId: " + productIdInvalid); + getAndVerifyRecommendationsByProductId("?productId=" + productIdInvalid, + UNPROCESSABLE_ENTITY) + .jsonPath("$.path").isEqualTo(BASE_URI) + .jsonPath("$.message").isEqualTo("Invalid productId: " + productIdInvalid); } private BodyContentSpec getAndVerifyRecommendationsByProductId( - int productId, HttpStatus expectedStatus) { - return getAndVerifyRecommendationsByProductId("?productId=" + productId, expectedStatus); + int productId) { + return getAndVerifyRecommendationsByProductId("?productId=" + productId, HttpStatus.OK); } private BodyContentSpec getAndVerifyRecommendationsByProductId( @@ -148,39 +155,14 @@ private BodyContentSpec getAndVerifyRecommendationsByProductId( .expectBody(); } - private BodyContentSpec postAndVerifyRecommendation( - int productId, int recommendationId, HttpStatus expectedStatus) { - - Recommendation recommendation = - new Recommendation( - productId, - recommendationId, - "Author " + recommendationId, - recommendationId, - "Content " + recommendationId, - "SA"); - - return client - .post() - .uri(BASE_URI) - .body(Mono.just(recommendation), Recommendation.class) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(expectedStatus) - .expectHeader() - .contentType(APPLICATION_JSON) - .expectBody(); + private void sendCreateRecommendationEvent(int productId, int recommendationId) { + Recommendation recommendation = new Recommendation(productId, recommendationId, "Author " + recommendationId, recommendationId, "Content " + recommendationId, "SA"); + Event event = new Event<>(CREATE, productId, recommendation); + input.send(new GenericMessage<>(event)); } - private void deleteAndVerifyRecommendationsByProductIdIsOk(int productId) { - client - .delete() - .uri(BASE_URI + "?productId=" + productId) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(OK) - .expectBody(); + private void sendDeleteRecommendationEvent(int productId) { + Event event = new Event<>(DELETE, productId, null); + input.send(new GenericMessage<>(event)); } -} +} \ No newline at end of file diff --git a/review-service/src/main/java/com/siriusxi/ms/store/revs/api/ReviewController.java b/review-service/src/main/java/com/siriusxi/ms/store/revs/api/ReviewController.java index 0e078c89..8c0ceebd 100644 --- a/review-service/src/main/java/com/siriusxi/ms/store/revs/api/ReviewController.java +++ b/review-service/src/main/java/com/siriusxi/ms/store/revs/api/ReviewController.java @@ -8,16 +8,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RestController; - -import java.util.List; +import reactor.core.publisher.Flux; /** * Class ReviewController is the implementation of the main Review Endpoint API * definition. * - * @see ProductEndpoint + * @see ReviewEndpoint * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RestController @@ -34,19 +33,7 @@ public ReviewController(@Qualifier("ReviewServiceImpl") ReviewService reviewServ /** {@inheritDoc} */ @Override - public Review createReview(Review body) { - return reviewService.createReview(body); - } - - /** {@inheritDoc} */ - @Override - public List getReviews(int productId) { + public Flux getReviews(int productId) { return reviewService.getReviews(productId); } - - /** {@inheritDoc} */ - @Override - public void deleteReviews(int productId) { - reviewService.deleteReviews(productId); - } } diff --git a/review-service/src/main/java/com/siriusxi/ms/store/revs/config/ReviewServiceConfiguration.java b/review-service/src/main/java/com/siriusxi/ms/store/revs/config/ReviewServiceConfiguration.java new file mode 100644 index 00000000..6a04b201 --- /dev/null +++ b/review-service/src/main/java/com/siriusxi/ms/store/revs/config/ReviewServiceConfiguration.java @@ -0,0 +1,27 @@ +package com.siriusxi.ms.store.revs.config; + +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import java.util.concurrent.Executors; + +@Configuration +@Log4j2 +public class ReviewServiceConfiguration { + + @Value("${spring.datasource.hikari.maximum-pool-size:10}") + Integer connectionPoolSize; + + @Bean + public Scheduler jdbcScheduler() { + log.info("Creates a jdbcScheduler with connectionPoolSize = {}", connectionPoolSize); + return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); + } + + + +} diff --git a/review-service/src/main/java/com/siriusxi/ms/store/revs/infra/MessageProcessor.java b/review-service/src/main/java/com/siriusxi/ms/store/revs/infra/MessageProcessor.java new file mode 100644 index 00000000..00a44cbf --- /dev/null +++ b/review-service/src/main/java/com/siriusxi/ms/store/revs/infra/MessageProcessor.java @@ -0,0 +1,57 @@ +package com.siriusxi.ms.store.revs.infra; + +import com.siriusxi.ms.store.api.core.review.ReviewService; +import com.siriusxi.ms.store.api.core.review.dto.Review; +import com.siriusxi.ms.store.api.event.Event; +import com.siriusxi.ms.store.util.exceptions.EventProcessingException; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +import static java.lang.String.valueOf; + +@EnableBinding(Sink.class) +@Log4j2 +public class MessageProcessor { + + private final ReviewService service; + + @Autowired + public MessageProcessor(@Qualifier("ReviewServiceImpl") ReviewService service) { + this.service = service; + } + + @StreamListener(target = Sink.INPUT) + public void process(Event event) { + + log.info("Process message created at {}...", event.getEventCreatedAt()); + + switch (event.getEventType()) { + case CREATE -> { + Review review = event.getData(); + log.info("Create review with ID: {}/{}", review.getProductId(), + review.getReviewId()); + service.createReview(review); + } + case DELETE -> { + int productId = event.getKey(); + log.info("Delete review with Product Id: {}", productId); + service.deleteReviews(productId); + } + default -> { + String errorMessage = + "Incorrect event type: " + .concat(valueOf(event.getEventType())) + .concat(", expected a CREATE or DELETE event"); + log.warn(errorMessage); + throw new EventProcessingException(errorMessage); + } + } + + log.info("Message processing done!"); + } + +} diff --git a/review-service/src/main/java/com/siriusxi/ms/store/revs/service/ReviewServiceImpl.java b/review-service/src/main/java/com/siriusxi/ms/store/revs/service/ReviewServiceImpl.java index b4e97209..b15af62c 100644 --- a/review-service/src/main/java/com/siriusxi/ms/store/revs/service/ReviewServiceImpl.java +++ b/review-service/src/main/java/com/siriusxi/ms/store/revs/service/ReviewServiceImpl.java @@ -7,11 +7,17 @@ import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import com.siriusxi.ms.store.util.http.ServiceUtil; import lombok.extern.log4j.Log4j2; +import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; import java.util.List; +import java.util.function.Supplier; + +import static java.util.logging.Level.FINE; @Service("ReviewServiceImpl") @Log4j2 @@ -20,17 +26,23 @@ public class ReviewServiceImpl implements ReviewService { private final ReviewRepository repository; private final ReviewMapper mapper; private final ServiceUtil serviceUtil; + private final Scheduler scheduler; @Autowired public ReviewServiceImpl( - ReviewRepository repository, ReviewMapper mapper, ServiceUtil serviceUtil) { + Scheduler scheduler, ReviewRepository repository, ReviewMapper mapper, + ServiceUtil serviceUtil) { this.repository = repository; this.mapper = mapper; this.serviceUtil = serviceUtil; + this.scheduler = scheduler; } @Override public Review createReview(Review body) { + + isValidProductId(body.getProductId()); + try { ReviewEntity entity = mapper.apiToEntity(body); ReviewEntity newEntity = repository.save(entity); @@ -49,13 +61,18 @@ public Review createReview(Review body) { } @Override - public List getReviews(int productId) { + public Flux getReviews(int productId) { - if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); + isValidProductId(productId); + + return asyncFlux(() -> Flux.fromIterable(getByProductId(productId))).log(null, FINE); +} - List entityList = repository.findByProductId(productId); - List list = mapper.entityListToApiList(entityList); - list.forEach(e -> e.setServiceAddress(serviceUtil.getServiceAddress())); + protected List getByProductId(int productId) { + + List list = mapper.entityListToApiList(repository.findByProductId(productId)); + list.forEach(e -> + e.setServiceAddress(serviceUtil.getServiceAddress())); log.debug("getReviews: response size: {}", list.size()); @@ -64,8 +81,17 @@ public List getReviews(int productId) { @Override public void deleteReviews(int productId) { + isValidProductId(productId); log.debug( "deleteReviews: tries to delete reviews for the product with productId: {}", productId); repository.deleteAll(repository.findByProductId(productId)); } + + private void isValidProductId(int productId) { + if (productId < 1) throw new InvalidInputException("Invalid productId: " + productId); + } + + private Flux asyncFlux(Supplier> publisherSupplier) { + return Flux.defer(publisherSupplier).subscribeOn(scheduler); + } } diff --git a/review-service/src/main/resources/application.yaml b/review-service/src/main/resources/application.yaml index fec7238e..900dc7ee 100644 --- a/review-service/src/main/resources/application.yaml +++ b/review-service/src/main/resources/application.yaml @@ -1,7 +1,6 @@ spring: application: name: review-service - jpa: show-sql: true open-in-view: false @@ -24,7 +23,7 @@ spring: hikari: initializationFailTimeout: 60000 connection-test-query: SELECT 1 - + maximum-pool-size: 10 flyway: #Enable or disable flyway migrations enabled: true @@ -32,7 +31,39 @@ spring: schemas: review-db user: ${spring.datasource.username} password: ${spring.datasource.password} - + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: guest + password: guest + cloud: + stream: + defaultBinder: rabbit + default: + contentType: application/json + bindings: + input: + destination: reviews + group: reviewsGroup + consumer: + maxAttempts: 3 + backOffInitialInterval: 500 + backOffMaxInterval: 1000 + backOffMultiplier: 2.0 + rabbit: + bindings: + input: + consumer: + autoBindDlq: true + republishToDlq: true + kafka: + bindings: + input: + consumer: + enableDlq: true + binder: + brokers: 127.0.0.1 + defaultBrokerPort: 9092 server: port: 9083 @@ -57,6 +88,8 @@ management: endpoint: shutdown: enabled: true + health: + show-details: always # This is a docker specific profile properties # Also profiles could be separated in its owen file @@ -71,6 +104,13 @@ spring: flyway: url: "jdbc:mysql://mysql/review-db?useSSL=false&serverTimezone=UTC" + rabbitmq: + host: rabbitmq + cloud: + stream: + kafka: + binder: + brokers: kafka server: port: 8080 \ No newline at end of file diff --git a/review-service/src/test/java/com/siriusxi/ms/store/revs/MapperTests.java b/review-service/src/test/java/com/siriusxi/ms/store/revs/MapperTests.java index 0ebd7d11..eaa17b37 100644 --- a/review-service/src/test/java/com/siriusxi/ms/store/revs/MapperTests.java +++ b/review-service/src/test/java/com/siriusxi/ms/store/revs/MapperTests.java @@ -10,7 +10,7 @@ import static org.junit.jupiter.api.Assertions.*; -public class MapperTests { +class MapperTests { private final ReviewMapper mapper = ReviewMapper.INSTANCE; diff --git a/review-service/src/test/java/com/siriusxi/ms/store/revs/PersistenceTests.java b/review-service/src/test/java/com/siriusxi/ms/store/revs/PersistenceTests.java index eeca74c7..24b72893 100644 --- a/review-service/src/test/java/com/siriusxi/ms/store/revs/PersistenceTests.java +++ b/review-service/src/test/java/com/siriusxi/ms/store/revs/PersistenceTests.java @@ -24,7 +24,7 @@ @Transactional(propagation = NOT_SUPPORTED) @ActiveProfiles("test") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -public class PersistenceTests { +class PersistenceTests { @Autowired private ReviewRepository repository; diff --git a/review-service/src/test/java/com/siriusxi/ms/store/revs/ReviewServiceApplicationTests.java b/review-service/src/test/java/com/siriusxi/ms/store/revs/ReviewServiceApplicationTests.java index 9e0c4f8e..d0f50eef 100644 --- a/review-service/src/test/java/com/siriusxi/ms/store/revs/ReviewServiceApplicationTests.java +++ b/review-service/src/test/java/com/siriusxi/ms/store/revs/ReviewServiceApplicationTests.java @@ -1,16 +1,25 @@ package com.siriusxi.ms.store.revs; import com.siriusxi.ms.store.api.core.review.dto.Review; +import com.siriusxi.ms.store.api.event.Event; import com.siriusxi.ms.store.revs.persistence.ReviewRepository; +import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Sink; import org.springframework.http.HttpStatus; +import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.GenericMessage; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.web.reactive.server.WebTestClient; import reactor.core.publisher.Mono; +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static org.assertj.core.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import static org.springframework.http.HttpStatus.*; @@ -28,8 +37,14 @@ class ReviewServiceApplicationTests { @Autowired private ReviewRepository repository; + @Autowired + private Sink channels; + + private AbstractMessageChannel input = null; + @BeforeEach public void setupDb() { + input = (AbstractMessageChannel) channels.input(); repository.deleteAll(); } @@ -40,19 +55,16 @@ public void getReviewsByProductId() { assertEquals(0, repository.findByProductId(productId).size()); - postAndVerifyReview(productId, 1, OK); - postAndVerifyReview(productId, 2, OK); - postAndVerifyReview(productId, 3, OK); + sendCreateReviewEvent(productId, 1); + sendCreateReviewEvent(productId, 2); + sendCreateReviewEvent(productId, 3); assertEquals(3, repository.findByProductId(productId).size()); - getAndVerifyReviewsByProductId(productId, OK) - .jsonPath("$.length()") - .isEqualTo(3) - .jsonPath("$[2].productId") - .isEqualTo(productId) - .jsonPath("$[2].reviewId") - .isEqualTo(3); + getAndVerifyReviewsByProductId(productId) + .jsonPath("$.length()").isEqualTo(3) + .jsonPath("$[2].productId").isEqualTo(productId) + .jsonPath("$[2].reviewId").isEqualTo(3); } @Test @@ -63,19 +75,20 @@ public void duplicateError() { assertEquals(0, repository.count()); - postAndVerifyReview(productId, reviewId, OK) - .jsonPath("$.productId") - .isEqualTo(productId) - .jsonPath("$.reviewId") - .isEqualTo(reviewId); + sendCreateReviewEvent(productId, reviewId); assertEquals(1, repository.count()); - postAndVerifyReview(productId, reviewId, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Duplicate key, Product Id: 1, Review Id:1"); + try { + sendCreateReviewEvent(productId, reviewId); + fail("Expected a MessagingException here!"); + } catch (MessagingException me) { + if (me.getCause() instanceof InvalidInputException iie) { + assertEquals("Duplicate key, Product Id: 1, Review Id:1", iie.getMessage()); + } else { + fail("Expected a InvalidInputException as the root cause!"); + } + } assertEquals(1, repository.count()); } @@ -84,41 +97,39 @@ public void duplicateError() { public void deleteReviews() { int productId = 1; - int recommendationId = 1; + int reviewId = 1; - postAndVerifyReview(productId, recommendationId, OK); + sendCreateReviewEvent(productId, reviewId); assertEquals(1, repository.findByProductId(productId).size()); - deleteAndVerifyReviewsByProductId(productId); + sendDeleteReviewEvent(productId); assertEquals(0, repository.findByProductId(productId).size()); - deleteAndVerifyReviewsByProductId(productId); + sendDeleteReviewEvent(productId); } @Test public void getReviewsMissingParameter() { getAndVerifyReviewsByProductId("", BAD_REQUEST) - .jsonPath("$.path") - .isEqualTo(BASE_URI) + .jsonPath("$.path").isEqualTo(BASE_URI) .jsonPath("$.message") - .isEqualTo("Required int parameter 'productId' is not present"); + .isEqualTo("Required int parameter 'productId' is not present"); } @Test public void getReviewsInvalidParameter() { getAndVerifyReviewsByProductId("?productId=no-integer", BAD_REQUEST) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Type mismatch."); + .jsonPath("$.path").isEqualTo(BASE_URI) + .jsonPath("$.message").isEqualTo("Type mismatch."); } @Test public void getReviewsNotFound() { - getAndVerifyReviewsByProductId("?productId=213", OK).jsonPath("$.length()").isEqualTo(0); + getAndVerifyReviewsByProductId("?productId=213", OK) + .jsonPath("$.length()").isEqualTo(0); } @Test @@ -126,16 +137,15 @@ public void getReviewsInvalidParameterNegativeValue() { int productIdInvalid = -1; - getAndVerifyReviewsByProductId("?productId=" + productIdInvalid, UNPROCESSABLE_ENTITY) - .jsonPath("$.path") - .isEqualTo(BASE_URI) - .jsonPath("$.message") - .isEqualTo("Invalid productId: " + productIdInvalid); + getAndVerifyReviewsByProductId("?productId=" + productIdInvalid, + UNPROCESSABLE_ENTITY) + .jsonPath("$.path").isEqualTo(BASE_URI) + .jsonPath("$.message").isEqualTo("Invalid productId: " + productIdInvalid); } private WebTestClient.BodyContentSpec getAndVerifyReviewsByProductId( - int productId, HttpStatus expectedStatus) { - return getAndVerifyReviewsByProductId("?productId=" + productId, expectedStatus); + int productId) { + return getAndVerifyReviewsByProductId("?productId=" + productId, HttpStatus.OK); } private WebTestClient.BodyContentSpec getAndVerifyReviewsByProductId( @@ -145,44 +155,20 @@ private WebTestClient.BodyContentSpec getAndVerifyReviewsByProductId( .uri(BASE_URI + productIdQuery) .accept(APPLICATION_JSON) .exchange() - .expectStatus() - .isEqualTo(expectedStatus) - .expectHeader() - .contentType(APPLICATION_JSON) + .expectStatus().isEqualTo(expectedStatus) + .expectHeader().contentType(APPLICATION_JSON) .expectBody(); } - private WebTestClient.BodyContentSpec postAndVerifyReview( - int productId, int reviewId, HttpStatus expectedStatus) { - Review review = - new Review( - productId, - reviewId, - "Author " + reviewId, - "Subject " + reviewId, - "Content " + reviewId, - "SA"); - return client - .post() - .uri(BASE_URI) - .body(Mono.just(review), Review.class) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(expectedStatus) - .expectHeader() - .contentType(APPLICATION_JSON) - .expectBody(); + private void sendCreateReviewEvent(int productId, int reviewId) { + Review review = new Review(productId, reviewId, "Author " + reviewId, + "Subject " + reviewId, "Content " + reviewId, "SA"); + Event event = new Event<>(CREATE, productId, review); + input.send(new GenericMessage<>(event)); } - private void deleteAndVerifyReviewsByProductId(int productId) { - client - .delete() - .uri(BASE_URI + "?productId=" + productId) - .accept(APPLICATION_JSON) - .exchange() - .expectStatus() - .isEqualTo(OK) - .expectBody(); + private void sendDeleteReviewEvent(int productId) { + Event event = new Event<>(DELETE, productId, null); + input.send(new GenericMessage<>(event)); } } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreEndpoint.java b/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreEndpoint.java index 2bea8e78..8e5142da 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreEndpoint.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreEndpoint.java @@ -6,6 +6,7 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; @@ -53,7 +54,7 @@ public interface StoreEndpoint extends StoreService { }) @GetMapping(value = "products/{id}", produces = APPLICATION_JSON_VALUE) - ProductAggregate getProduct(@PathVariable int id); + Mono getProduct(@PathVariable int id); /** * Sample usage: diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreService.java b/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreService.java index 53d6aa68..61f9508d 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreService.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/composite/StoreService.java @@ -1,6 +1,7 @@ package com.siriusxi.ms.store.api.composite; import com.siriusxi.ms.store.api.composite.dto.ProductAggregate; +import reactor.core.publisher.Mono; /** * Interface that define the general service contract (methods) for the Store @@ -28,13 +29,14 @@ public interface StoreService { /** * Get the aggregate product with its reviews and recommendations and services involved in the * call. + * It is a Non-Blocking API. * * @see ProductAggregate * @param id is the product id that you are looking for. * @return the product, if found, else null. * @since v0.1 */ - ProductAggregate getProduct(int id); + Mono getProduct(int id); /** * Delete the product and all its relate reviews and recommendations from their repositories. diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductEndpoint.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductEndpoint.java index b5ae4d31..2c44fccb 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductEndpoint.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductEndpoint.java @@ -2,6 +2,7 @@ import com.siriusxi.ms.store.api.core.product.dto.Product; import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; @@ -12,7 +13,7 @@ * * @see ProductService * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RequestMapping("products") @@ -29,31 +30,5 @@ public interface ProductEndpoint extends ProductService { */ @Override @GetMapping(value = "{productId}", produces = APPLICATION_JSON_VALUE) - Product getProduct(@PathVariable("productId") int id); - - /** - * Sample usage: - * - *

curl -X POST $HOST:$PORT/products \ -H "Content-Type: application/json" --data \ - * '{"productId":123,"name":"product 123","weight":123}' - * - * @param body product to save. - * @return Product just created. - * @since v3.0 codename Storm - */ - @Override - @PostMapping(produces = APPLICATION_JSON_VALUE, consumes = APPLICATION_JSON_VALUE) - Product createProduct(@RequestBody Product body); - - /** - * Sample usage: - * - *

curl -X DELETE $HOST:$PORT/products/1 - * - * @param id to be deleted. - * @since v3.0 codename Storm - */ - @Override - @DeleteMapping("{productId}") - void deleteProduct(@PathVariable("productId") int id); + Mono getProduct(@PathVariable("productId") int id); } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductService.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductService.java index 04a8b095..4a36ecce 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductService.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/product/ProductService.java @@ -1,6 +1,7 @@ package com.siriusxi.ms.store.api.core.product; import com.siriusxi.ms.store.api.core.product.dto.Product; +import reactor.core.publisher.Mono; /** * Interface that define the general service contract (methods) for the Product @@ -18,21 +19,21 @@ public interface ProductService { /** * Get the product with Id from repository. + * It is a Non-Blocking API. * * @param id is the product id that you are looking for. * @return the product, if found, else null. * @since v0.1 */ - Product getProduct(int id); + Mono getProduct(int id); /** * Add product to the repository. * * @param body product to save. - * @return just created product. * @since v0.1 */ - Product createProduct(Product body); + default Product createProduct(Product body){ return null;} /** * Delete the product from repository. @@ -41,5 +42,5 @@ public interface ProductService { * @param id to be deleted. * @since v0.1 */ - void deleteProduct(int id); + default void deleteProduct(int id){} } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationEndpoint.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationEndpoint.java index 9fca72c5..a95e1c55 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationEndpoint.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationEndpoint.java @@ -1,9 +1,10 @@ package com.siriusxi.ms.store.api.core.recommendation; import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; -import org.springframework.web.bind.annotation.*; - -import java.util.List; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import reactor.core.publisher.Flux; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; /** @@ -13,7 +14,7 @@ * * @see RecommendationService * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RequestMapping("recommendations") @@ -29,31 +30,5 @@ public interface RecommendationEndpoint extends RecommendationService { * @since v3.0 codename Storm */ @GetMapping(produces = APPLICATION_JSON_VALUE) - List getRecommendations(@RequestParam("productId") int productId); - - /** - * Sample usage: - * - *

curl -X POST $HOST:$PORT/recommendations \ - * -H "Content-Type: application/json" --data \ - * '{"productId":123,"recommendationId":456,"author":"me","rate":5,"content":"yada, yada, yada" - * }' - * - * @param body the recommendation to add. - * @return currently created recommendation. - * @since v3.0 codename Storm - */ - @PostMapping(produces = APPLICATION_JSON_VALUE, consumes = APPLICATION_JSON_VALUE) - Recommendation createRecommendation(@RequestBody Recommendation body); - - /** - * Sample usage: - * - *

curl -X DELETE $HOST:$PORT/recommendations?productId=1 - * - * @param productId to delete recommendations for. - * @since version 0.1 - */ - @DeleteMapping - void deleteRecommendations(@RequestParam("productId") int productId); + Flux getRecommendations(@RequestParam("productId") int productId); } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationService.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationService.java index 2037a9b0..bf9c5c49 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationService.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/recommendation/RecommendationService.java @@ -1,8 +1,7 @@ package com.siriusxi.ms.store.api.core.recommendation; import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; - -import java.util.List; +import reactor.core.publisher.Flux; /** * Interface that define the general service contract (methods) for the Recommendation @@ -18,13 +17,13 @@ */ public interface RecommendationService { /** - * Get all recommendations for specific product by product id. + * Get all recommendations for specific product by product id. It is a Non-Blocking API. * * @param productId that you are looking for its recommendations. * @return list of product recommendations, or empty list if there are no recommendations. * @since v0.1 */ - List getRecommendations(int productId); + Flux getRecommendations(int productId); /** * Create a new recommendation for a product. @@ -33,7 +32,9 @@ public interface RecommendationService { * @return currently created recommendation. * @since v0.1 */ - Recommendation createRecommendation(Recommendation body); + default Recommendation createRecommendation(Recommendation body) { + return null; + } /** * Delete all product recommendations. @@ -41,5 +42,5 @@ public interface RecommendationService { * @param productId to delete recommendations for. * @since v0.1 */ - void deleteRecommendations(int productId); + default void deleteRecommendations(int productId) {} } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewEndpoint.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewEndpoint.java index 48dbcaea..dd724f5e 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewEndpoint.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewEndpoint.java @@ -2,6 +2,7 @@ import com.siriusxi.ms.store.api.core.review.dto.Review; import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Flux; import java.util.List; @@ -14,27 +15,12 @@ * * @see ReviewService * @author mohamed.taman - * @version v1.0 + * @version v4.0 * @since v3.0 codename Storm */ @RequestMapping("reviews") public interface ReviewEndpoint extends ReviewService { - /** - * Sample usage: - * - *

curl -X POST $HOST:$PORT/reviews \ - * -H "Content-Type: application/json" --data \ - * '{"productId":123,"reviewId":456,"author":"me","subject":"yada, yada, yada", - * "content":"yada, yada, yada"}' - * - * @param body review to be created. - * @return just created review. - * @since v3.0 codename Storm - */ - @PostMapping(produces = APPLICATION_JSON_VALUE, consumes = APPLICATION_JSON_VALUE) - Review createReview(@RequestBody Review body); - /** * Sample usage: * @@ -45,16 +31,5 @@ public interface ReviewEndpoint extends ReviewService { * @since v3.0 codename Storm */ @GetMapping(produces = APPLICATION_JSON_VALUE) - List getReviews(@RequestParam("productId") int productId); - - /** - * Sample usage: - * - *

curl -X DELETE $HOST:$PORT/review?productId=1 - * - * @param productId to delete its reviews. - * @since v3.0 codename Storm - */ - @DeleteMapping - void deleteReviews(@RequestParam("productId") int productId); + Flux getReviews(@RequestParam("productId") int productId); } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewService.java b/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewService.java index 7586ea26..20321f53 100644 --- a/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewService.java +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/core/review/ReviewService.java @@ -1,6 +1,7 @@ package com.siriusxi.ms.store.api.core.review; import com.siriusxi.ms.store.api.core.review.dto.Review; +import reactor.core.publisher.Flux; import java.util.List; @@ -13,18 +14,19 @@ * * * @author mohamed.taman - * @version v0.2 + * @version v4.0 * @since v0.1 */ public interface ReviewService { /** * Get all reviews for specific product by product id. + * It is a Non-Blocking API. * * @param productId that you are looking for its reviews. * @return list of reviews for this product, or empty list if there are no reviews. */ - List getReviews(int productId); + Flux getReviews(int productId); /** * Create a new review for a product. @@ -32,12 +34,12 @@ public interface ReviewService { * @param body review to be created. * @return just created review. */ - Review createReview(Review body); + default Review createReview(Review body){return null;} /** * Delete all product reviews. * * @param productId to delete its reviews. */ - void deleteReviews(int productId); + default void deleteReviews(int productId){} } diff --git a/store-api/src/main/java/com/siriusxi/ms/store/api/event/Event.java b/store-api/src/main/java/com/siriusxi/ms/store/api/event/Event.java new file mode 100644 index 00000000..462d379d --- /dev/null +++ b/store-api/src/main/java/com/siriusxi/ms/store/api/event/Event.java @@ -0,0 +1,29 @@ +package com.siriusxi.ms.store.api.event; + +import lombok.*; + +import java.time.LocalDateTime; + +import static java.time.LocalDateTime.now; +import static lombok.AccessLevel.NONE; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@Setter(NONE) +public class Event { + + public enum Type {CREATE, DELETE} + + private Event.Type eventType; + private K key; + private T data; + private LocalDateTime eventCreatedAt; + + public Event(Type eventType, K key, T data) { + this.eventType = eventType; + this.key = key; + this.data = data; + this.eventCreatedAt = now(); + } +} \ No newline at end of file diff --git a/store-build-chassis/pom.xml b/store-build-chassis/pom.xml index 989a9950..c0af229c 100644 --- a/store-build-chassis/pom.xml +++ b/store-build-chassis/pom.xml @@ -19,6 +19,7 @@ 14 + Hoxton.RELEASE UTF-8 UTF-8 @@ -47,6 +48,15 @@ ${springfox.swagger.version} + + + org.springframework.cloud + spring-cloud-dependencies + ${spring.cloud.version} + pom + import + + @@ -66,6 +76,13 @@ spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-properties-migrator + runtime + + @@ -96,6 +113,10 @@ ${maven.surefire.plugin.version} --enable-preview + + **/*Tests.java + **/*Test.java + diff --git a/store-service-chassis/pom.xml b/store-service-chassis/pom.xml index 912d7bfc..bc0122db 100644 --- a/store-service-chassis/pom.xml +++ b/store-service-chassis/pom.xml @@ -23,7 +23,7 @@ - + org.springframework.boot spring-boot-devtools @@ -31,21 +31,35 @@ true + org.springframework.boot spring-boot-configuration-processor true + - + org.springframework.boot spring-boot-starter-actuator - + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + @@ -65,11 +79,20 @@ test + io.projectreactor reactor-test test + + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + diff --git a/store-service/src/main/java/com/siriusxi/ms/store/pcs/api/StoreController.java b/store-service/src/main/java/com/siriusxi/ms/store/pcs/api/StoreController.java index 360eb8ff..e918b62d 100644 --- a/store-service/src/main/java/com/siriusxi/ms/store/pcs/api/StoreController.java +++ b/store-service/src/main/java/com/siriusxi/ms/store/pcs/api/StoreController.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; @RestController @Log4j2 @@ -22,9 +23,10 @@ public StoreController(@Qualifier("StoreServiceImpl") StoreService storeService) this.storeService = storeService; } - /** {@inheritDoc} */ + /** {@inheritDoc} + * @return*/ @Override - public ProductAggregate getProduct(int id) { + public Mono getProduct(int id) { return storeService.getProduct(id); } diff --git a/store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreConfiguration.java b/store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreServiceConfiguration.java similarity index 52% rename from store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreConfiguration.java rename to store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreServiceConfiguration.java index d84591de..c683e55b 100644 --- a/store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreConfiguration.java +++ b/store-service/src/main/java/com/siriusxi/ms/store/pcs/config/StoreServiceConfiguration.java @@ -1,6 +1,11 @@ package com.siriusxi.ms.store.pcs.config; +import com.siriusxi.ms.store.pcs.integration.StoreIntegration; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.actuate.health.CompositeReactiveHealthContributor; +import org.springframework.boot.actuate.health.ReactiveHealthContributor; +import org.springframework.boot.actuate.health.ReactiveHealthIndicator; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestTemplate; @@ -9,39 +14,65 @@ import springfox.documentation.service.Contact; import springfox.documentation.spring.web.plugins.Docket; +import java.util.Map; + import static java.util.Collections.emptyList; import static org.springframework.web.bind.annotation.RequestMethod.*; import static springfox.documentation.builders.RequestHandlerSelectors.basePackage; import static springfox.documentation.spi.DocumentationType.SWAGGER_2; @Configuration -public class StoreConfiguration { +public class StoreServiceConfiguration { + + private final StoreIntegration integration; + @Value("${api.common.version}") - String apiVersion; + private String apiVersion; @Value("${api.common.title}") - String apiTitle; + private String apiTitle; @Value("${api.common.description}") - String apiDescription; + private String apiDescription; @Value("${api.common.termsOfServiceUrl}") - String apiTermsOfServiceUrl; + private String apiTermsOfServiceUrl; @Value("${api.common.license}") - String apiLicense; + private String apiLicense; @Value("${api.common.licenseUrl}") - String apiLicenseUrl; + private String apiLicenseUrl; @Value("${api.common.contact.name}") - String apiContactName; + private String apiContactName; @Value("${api.common.contact.url}") - String apiContactUrl; + private String apiContactUrl; @Value("${api.common.contact.email}") - String apiContactEmail; + private String apiContactEmail; + + @Autowired + public StoreServiceConfiguration(StoreIntegration integration) { + this.integration = integration; + } + + @Bean(name = "Core System Microservices") + ReactiveHealthContributor coreServices() { + + ReactiveHealthIndicator productHealthIndicator = integration::getProductHealth; + ReactiveHealthIndicator recommendationHealthIndicator = integration::getRecommendationHealth; + ReactiveHealthIndicator reviewHealthIndicator = integration::getReviewHealth; + + Map allIndicators = + Map.of( + "Product Service", productHealthIndicator, + "Recommendation Service", recommendationHealthIndicator, + "Review Service", reviewHealthIndicator); + + return CompositeReactiveHealthContributor.fromMap(allIndicators); + } @Bean RestTemplate newRestClient() { @@ -66,13 +97,17 @@ Using the apis() and paths() methods, .paths(PathSelectors.any()) .build() /* - Using the globalResponseMessage() method, we ask SpringFox not to add any default HTTP response codes to the API documentation, such as 401 and 403, which we don't currently use. + Using the globalResponseMessage() method, we ask SpringFox not to add any default HTTP + response codes to the API documentation, such as 401 and 403, + which we don't currently use. */ .globalResponseMessage(POST, emptyList()) .globalResponseMessage(GET, emptyList()) .globalResponseMessage(DELETE, emptyList()) /* - The api* variables that are used to configure the Docket bean with general information about the API are initialized from the property file using Spring @Value annotations. + The api* variables that are used to configure the Docket bean with general + information about the API are initialized from the property file using + Spring @Value annotations. */ .apiInfo( new ApiInfo( diff --git a/store-service/src/main/java/com/siriusxi/ms/store/pcs/integration/StoreIntegration.java b/store-service/src/main/java/com/siriusxi/ms/store/pcs/integration/StoreIntegration.java index 58204b82..da4fc814 100644 --- a/store-service/src/main/java/com/siriusxi/ms/store/pcs/integration/StoreIntegration.java +++ b/store-service/src/main/java/com/siriusxi/ms/store/pcs/integration/StoreIntegration.java @@ -7,41 +7,49 @@ import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; import com.siriusxi.ms.store.api.core.review.ReviewService; import com.siriusxi.ms.store.api.core.review.dto.Review; +import com.siriusxi.ms.store.api.event.Event; import com.siriusxi.ms.store.util.exceptions.InvalidInputException; import com.siriusxi.ms.store.util.exceptions.NotFoundException; import com.siriusxi.ms.store.util.http.HttpErrorInfo; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.ParameterizedTypeReference; +import org.springframework.boot.actuate.health.Health; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; -import org.springframework.web.client.HttpClientErrorException; -import org.springframework.web.client.RestTemplate; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static com.siriusxi.ms.store.pcs.integration.StoreIntegration.MessageSources; import static java.lang.String.valueOf; -import static org.springframework.http.HttpMethod.GET; +import static org.springframework.integration.support.MessageBuilder.withPayload; +import static reactor.core.publisher.Flux.empty; +@EnableBinding(MessageSources.class) @Component @Log4j2 public class StoreIntegration implements ProductService, RecommendationEndpoint, ReviewService { public static final String PRODUCT_ID_QUERY_PARAM = "?productId="; - - private final RestTemplate restTemplate; + private final WebClient webClient; private final ObjectMapper mapper; - + private final MessageSources messageSources; private final String productServiceUrl; private final String recommendationServiceUrl; private final String reviewServiceUrl; - @Autowired public StoreIntegration( - RestTemplate restTemplate, + WebClient.Builder webClient, ObjectMapper mapper, + MessageSources messageSources, @Value("${app.product-service.host}") String productServiceHost, @Value("${app.product-service.port}") int productServicePort, @Value("${app.recommendation-service.host}") String recommendationServiceHost, @@ -49,204 +57,196 @@ public StoreIntegration( @Value("${app.review-service.host}") String reviewServiceHost, @Value("${app.review-service.port}") int reviewServicePort) { - this.restTemplate = restTemplate; + this.webClient = webClient.build(); this.mapper = mapper; + this.messageSources = messageSources; var http = "http://"; productServiceUrl = http.concat(productServiceHost) .concat(":") - .concat(valueOf(productServicePort)) - .concat("/products/"); + .concat(valueOf(productServicePort)); recommendationServiceUrl = http.concat(recommendationServiceHost) .concat(":") - .concat(valueOf(recommendationServicePort)) - .concat("/recommendations"); + .concat(valueOf(recommendationServicePort)); reviewServiceUrl = http.concat(reviewServiceHost) .concat(":") - .concat(valueOf(reviewServicePort)) - .concat("/reviews"); + .concat(valueOf(reviewServicePort)); } @Override public Product createProduct(Product body) { - - try { - String url = productServiceUrl; - log.debug("Will post a new product to URL: {}", url); - - Product product = restTemplate.postForObject(url, body, Product.class); - log.debug("Created a product with id: {}", product != null ? product.getProductId() : -1); - - return product; - - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } + log.debug("Publishing a create event for a new product {}",body.toString()); + messageSources + .outputProducts() + .send(withPayload(new Event<>(CREATE, body.getProductId(), body)).build()); + return body; } @Override - public Product getProduct(int productId) { - - try { - String url = productServiceUrl + "/" + productId; - log.debug("Will call the getProduct API on URL: {}", url); + public Mono getProduct(int productId) { - Product product = restTemplate.getForObject(url, Product.class); - log.debug("Found a product with id: {}", product != null ? product.getProductId() : -1); + var url = productServiceUrl + .concat("/products/") + .concat(valueOf(productId)); - return product; + log.debug("Will call the getProduct API on URL: {}", url); - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } + return webClient + .get() + .uri(url) + .retrieve() + .bodyToMono(Product.class) + .log() + .onErrorMap(WebClientResponseException.class, this::handleException); } @Override public void deleteProduct(int productId) { - try { - String url = productServiceUrl + "/" + productId; - log.debug("Will call the deleteProduct API on URL: {}", url); - - restTemplate.delete(url); - - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } + log.debug("Publishing a delete event for product id {}", productId); + messageSources + .outputProducts() + .send(withPayload(new Event<>(DELETE, productId, null)).build()); } @Override public Recommendation createRecommendation(Recommendation body) { + log.debug("Publishing a create event for a new recommendation {}",body.toString()); - try { - String url = recommendationServiceUrl; - log.debug("Will post a new recommendation to URL: {}", url); - - Recommendation recommendation = restTemplate.postForObject(url, body, Recommendation.class); - log.debug("Created a recommendation with id: {}", - recommendation != null ? recommendation.getRecommendationId() : -1); - - return recommendation; + messageSources + .outputRecommendations() + .send(withPayload(new Event<>(CREATE, body.getProductId(), body)).build()); - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } + return body; } @Override - public List getRecommendations(int productId) { - - try { - String url = recommendationServiceUrl.concat(PRODUCT_ID_QUERY_PARAM).concat(valueOf(productId)); - - log.debug("Will call the getRecommendations API on URL: {}", url); - List recommendations = - restTemplate - .exchange(url, GET, null, new ParameterizedTypeReference>() {}) - .getBody(); - - log.debug( - "Found {} recommendations for a product with id: {}", recommendations != null ? recommendations.size() : 0, productId); - return recommendations; - - } catch (Exception ex) { - log.warn( - "Got an exception while requesting recommendations, return zero recommendations: {}", - ex.getMessage()); - return new ArrayList<>(); - } + public Flux getRecommendations(int productId) { + + var url = recommendationServiceUrl + .concat("/recommendations") + .concat(PRODUCT_ID_QUERY_PARAM) + .concat(valueOf(productId)); + + log.debug("Will call the getRecommendations API on URL: {}", url); + + /* Return an empty result if something goes wrong to make it possible + for the composite service to return partial responses + */ + return webClient + .get() + .uri(url) + .retrieve() + .bodyToFlux(Recommendation.class) + .log() + .onErrorResume(error -> empty()); } @Override public void deleteRecommendations(int productId) { - try { - String url = recommendationServiceUrl - .concat(PRODUCT_ID_QUERY_PARAM) - .concat(valueOf(productId)); - log.debug("Will call the deleteRecommendations API on URL: {}", url); - - restTemplate.delete(url); - - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } + messageSources + .outputRecommendations() + .send(withPayload(new Event<>(DELETE, productId, null)).build()); } @Override public Review createReview(Review body) { + messageSources + .outputReviews() + .send(withPayload(new Event<>(CREATE, body.getProductId(), body)).build()); + return body; + } - try { - String url = reviewServiceUrl; - log.debug("Will post a new review to URL: {}", url); + @Override + public Flux getReviews(int productId) { - var review = restTemplate.postForObject(url, body, Review.class); - log.debug("Created a review with id: {}", review != null ? review.getProductId() : 0); + var url = reviewServiceUrl + .concat("/reviews") + .concat(PRODUCT_ID_QUERY_PARAM) + .concat(valueOf(productId)); - return review; + log.debug("Will call the getReviews API on URL: {}", url); + + /* Return an empty result if something goes wrong to make it possible + for the composite service to return partial responses + */ + return webClient + .get() + .uri(url) + .retrieve() + .bodyToFlux(Review.class).log() + .onErrorResume(error -> empty()); - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); - } } @Override - public List getReviews(int productId) { + public void deleteReviews(int productId) { + messageSources + .outputReviews() + .send(withPayload(new Event<>(DELETE, productId, null)).build()); + } - try { - String url = reviewServiceUrl - .concat(PRODUCT_ID_QUERY_PARAM) - .concat(valueOf(productId)); - - log.debug("Will call the getReviews API on URL: {}", url); - List reviews = - restTemplate - .exchange(url, GET, null, new ParameterizedTypeReference>() {}) - .getBody(); - - log.debug("Found {} reviews for a product with id: {}", reviews != null ? reviews.size() : 0, productId); - return reviews; - - } catch (Exception ex) { - log.warn( - "Got an exception while requesting reviews, return zero reviews: {}", ex.getMessage()); - return new ArrayList<>(); - } + public Mono getProductHealth() { + return getHealth(productServiceUrl); } - @Override - public void deleteReviews(int productId) { - try { - String url = reviewServiceUrl - .concat(PRODUCT_ID_QUERY_PARAM) - .concat(valueOf(productId)); - log.debug("Will call the deleteReviews API on URL: {}", url); + public Mono getRecommendationHealth() { + return getHealth(recommendationServiceUrl); + } + + public Mono getReviewHealth() { + return getHealth(reviewServiceUrl); + } - restTemplate.delete(url); + private Mono getHealth(String url) { + url += "/actuator/health"; + log.debug("Will call the Health API on URL: {}", url); + return webClient.get().uri(url).retrieve().bodyToMono(String.class) + .map(s -> new Health.Builder().up().build()) + .onErrorResume(ex -> Mono.just(new Health.Builder().down(ex).build())) + .log(); + } - } catch (HttpClientErrorException ex) { - throw handleHttpClientException(ex); + private Throwable handleException(Throwable ex) { + if (!(ex instanceof WebClientResponseException wcre)) { + log.warn("Got a unexpected error: {}, will rethrow it", ex.toString()); + return ex; } - } - private RuntimeException handleHttpClientException(HttpClientErrorException ex) { - return switch (ex.getStatusCode()) { - case NOT_FOUND -> new NotFoundException(getErrorMessage(ex)); - case UNPROCESSABLE_ENTITY -> new InvalidInputException(getErrorMessage(ex)); + return switch (wcre.getStatusCode()) { + case NOT_FOUND -> new NotFoundException(getErrorMessage(wcre)); + case UNPROCESSABLE_ENTITY -> new InvalidInputException(getErrorMessage(wcre)); default -> { - log.warn("Got a unexpected HTTP error: {}, will rethrow it", ex.getStatusCode()); - log.warn("Error body: {}", ex.getResponseBodyAsString()); - throw ex;} + log.warn("Got a unexpected HTTP error: {}, will rethrow it", wcre.getStatusCode()); + log.warn("Error body: {}", wcre.getResponseBodyAsString()); + throw wcre;} }; } - private String getErrorMessage(HttpClientErrorException ex) { + private String getErrorMessage(WebClientResponseException ex) { try { return mapper.readValue(ex.getResponseBodyAsString(), HttpErrorInfo.class).message(); } catch (IOException ioException) { return ex.getMessage(); } } + + public interface MessageSources { + + String OUTPUT_PRODUCTS = "output-products"; + String OUTPUT_RECOMMENDATIONS = "output-recommendations"; + String OUTPUT_REVIEWS = "output-reviews"; + + @Output(OUTPUT_PRODUCTS) + MessageChannel outputProducts(); + + @Output(OUTPUT_RECOMMENDATIONS) + MessageChannel outputRecommendations(); + + @Output(OUTPUT_REVIEWS) + MessageChannel outputReviews(); + } } diff --git a/store-service/src/main/java/com/siriusxi/ms/store/pcs/service/StoreServiceImpl.java b/store-service/src/main/java/com/siriusxi/ms/store/pcs/service/StoreServiceImpl.java index 1a2a5f64..d8ea44d8 100644 --- a/store-service/src/main/java/com/siriusxi/ms/store/pcs/service/StoreServiceImpl.java +++ b/store-service/src/main/java/com/siriusxi/ms/store/pcs/service/StoreServiceImpl.java @@ -9,11 +9,11 @@ import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; import com.siriusxi.ms.store.api.core.review.dto.Review; import com.siriusxi.ms.store.pcs.integration.StoreIntegration; -import com.siriusxi.ms.store.util.exceptions.NotFoundException; import com.siriusxi.ms.store.util.http.ServiceUtil; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; import java.util.List; import java.util.stream.Collectors; @@ -37,16 +37,17 @@ public void createProduct(ProductAggregate body) { try { log.debug( - "createCompositeProduct: creates a new composite entity for id: {}", body.getProductId()); + "createCompositeProduct: creates a new composite entity for productId: {}", + body.getProductId()); - Product product = new Product(body.getProductId(), body.getName(), body.getWeight(), null); + var product = new Product(body.getProductId(), body.getName(), body.getWeight(), null); integration.createProduct(product); if (body.getRecommendations() != null) { body.getRecommendations() .forEach( r -> { - Recommendation recommendation = + var recommendation = new Recommendation( body.getProductId(), r.getRecommendationId(), @@ -73,45 +74,49 @@ public void createProduct(ProductAggregate body) { integration.createReview(review); }); } - log.debug( - "createCompositeProduct: composite entites created for id: {}", body.getProductId()); + "createCompositeProduct: composite entities created for productId: {}", + body.getProductId()); } catch (RuntimeException re) { - log.warn("createCompositeProduct failed", re); + log.warn("createCompositeProduct failed: {}", re.toString()); throw re; } } @Override - public ProductAggregate getProduct(int id) { - log.debug("getCompositeProduct: lookup a product aggregate for id: {}", id); - - Product product = integration.getProduct(id); - if (product == null) throw new NotFoundException("No product found for id: " + id); - - List recommendations = integration.getRecommendations(id); - - List reviews = integration.getReviews(id); - - log.debug("getCompositeProduct: aggregate entity found for id: {}", id); - - return createProductAggregate( - product, recommendations, reviews, serviceUtil.getServiceAddress()); + public Mono getProduct(int productId) { + return Mono.zip( + values -> + createProductAggregate( + (Product) values[0], + (List) values[1], + (List) values[2], + serviceUtil.getServiceAddress()), + integration.getProduct(productId), + integration.getRecommendations(productId).collectList(), + integration.getReviews(productId).collectList()) + .doOnError(ex -> log.warn("getCompositeProduct failed: {}", ex.toString())) + .log(); } @Override - public void deleteProduct(int id) { + public void deleteProduct(int productId) { - log.debug("deleteCompositeProduct: Deletes a product aggregate for id: {}", id); + try { - integration.deleteProduct(id); + log.debug("deleteCompositeProduct: Deletes a product aggregate for productId: {}", productId); - integration.deleteRecommendations(id); + integration.deleteProduct(productId); + integration.deleteRecommendations(productId); + integration.deleteReviews(productId); - integration.deleteReviews(id); + log.debug("deleteCompositeProduct: aggregate entities deleted for productId: {}", productId); - log.debug("getCompositeProduct: aggregate entities deleted for id: {}", id); + } catch (RuntimeException re) { + log.warn("deleteCompositeProduct failed: {}", re.toString()); + throw re; + } } private ProductAggregate createProductAggregate( @@ -121,7 +126,7 @@ private ProductAggregate createProductAggregate( String serviceAddress) { // 1. Setup product info - int id = product.getProductId(); + int productId = product.getProductId(); String name = product.getName(); int weight = product.getWeight(); @@ -150,15 +155,15 @@ private ProductAggregate createProductAggregate( // 4. Create info regarding the involved microservices addresses String productAddress = product.getServiceAddress(); String reviewAddress = - (reviews != null && !reviews.isEmpty()) ? reviews.get(0).getServiceAddress() : ""; + (reviews != null && reviews.size() > 0) ? reviews.get(0).getServiceAddress() : ""; String recommendationAddress = - (recommendations != null && !recommendations.isEmpty()) + (recommendations != null && recommendations.size() > 0) ? recommendations.get(0).getServiceAddress() : ""; ServiceAddresses serviceAddresses = new ServiceAddresses(serviceAddress, productAddress, reviewAddress, recommendationAddress); return new ProductAggregate( - id, name, weight, recommendationSummaries, reviewSummaries, serviceAddresses); + productId, name, weight, recommendationSummaries, reviewSummaries, serviceAddresses); } } diff --git a/store-service/src/main/resources/application.yaml b/store-service/src/main/resources/application.yaml index 2e7c1672..fba8a697 100644 --- a/store-service/src/main/resources/application.yaml +++ b/store-service/src/main/resources/application.yaml @@ -2,6 +2,35 @@ spring: application: name: store-service + cloud: + stream: + default-binder: rabbit + default: + contentType: application/json + bindings: + output-products: + destination: products + producer: + required-groups: auditGroup + output-recommendations: + destination: recommendations + producer: + required-groups: auditGroup + output-reviews: + destination: reviews + producer: + required-groups: auditGroup + kafka: + binder: + brokers: 127.0.0.1 + defaultBrokerPort: 9092 + + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: guest + password: guest + server: port: 9080 @@ -12,6 +41,9 @@ logging: com.siriusxi.ms.store: DEBUG management: + info: + git: + enabled: true endpoints: web: exposure: @@ -19,6 +51,8 @@ management: endpoint: shutdown: enabled: true + health: + show-details: "ALWAYS" # Custom configurations app: @@ -35,8 +69,16 @@ app: # Swagger properties api: common: - version: 1.0.0 - title: Springy Store μServices + version: 4.0 + title: "Springy Store μServices" + termsOfServiceUrl: https://mohamed-taman.github.io/Springy-Store-Microservices/ + license: "MIT License" + licenseUrl: "https://github.com/mohamed-taman/Springy-Store-Microservices/blob/master/LICENSE" + + contact: + name: "Mohamed Taman" + url: "https://twitter.com/_tamanm" + email: "mohamed.taman@mail.com" description: | **Springy Store** is a conceptual simple μServices-based project using the latest cutting-edge technologies, to demonstrate how the store is created to be a @@ -44,14 +86,6 @@ api: This project μServices are developed based on Spring Boot & Cloud framework, that implement **cloud-native** intuitive, **design patterns** and **best practices**. - termsOfServiceUrl: https://mohamed-taman.github.io/Springy-Store-Microservices/ - license: MIT License - licenseUrl: https://github.com/mohamed-taman/Springy-Store-Microservices/blob/master/LICENSE - - contact: - name: Mohamed Taman - url: https://twitter.com/_tamanm - email: mohamed.taman@mail.com product-composite: get-composite-product: @@ -96,6 +130,13 @@ spring: profiles: docker jmx: enabled: false + rabbitmq: + host: rabbitmq + cloud: + stream: + kafka: + binder: + brokers: kafka server: port: 8080 diff --git a/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEvent.java b/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEvent.java new file mode 100644 index 00000000..fb29bd2e --- /dev/null +++ b/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEvent.java @@ -0,0 +1,81 @@ +package com.siriusxi.ms.store.pcs; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.siriusxi.ms.store.api.event.Event; +import lombok.extern.log4j.Log4j2; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@Log4j2 +class IsSameEvent extends TypeSafeMatcher { + + private final ObjectMapper mapper = new ObjectMapper(); + + private Event expectedEvent; + + + private IsSameEvent(Event expectedEvent) { + this.expectedEvent = expectedEvent; + } + + @Override + protected boolean matchesSafely(String eventAsJson) { + + if (expectedEvent == null) return false; + + log.trace("Convert the following json string to a map: {}", eventAsJson); + Map mapEvent = convertJsonStringToMap(eventAsJson); + mapEvent.remove("eventCreatedAt"); + + Map mapExpectedEvent = getMapWithoutCreatedAt(expectedEvent); + + log.trace("Got the map: {}", mapEvent); + log.trace("Compare to the expected map: {}", mapExpectedEvent); + return mapEvent.equals(mapExpectedEvent); + } + + @Override + public void describeTo(Description description) { + String expectedJson = convertObjectToJsonString(expectedEvent); + description.appendText("expected to look like " + expectedJson); + } + + public static Matcher sameEventExceptCreatedAt(Event expectedEvent) { + return new IsSameEvent(expectedEvent); + } + + private Map getMapWithoutCreatedAt(Event event) { + Map mapEvent = convertObjectToMap(event); + mapEvent.remove("eventCreatedAt"); + return mapEvent; + } + + private Map convertObjectToMap(Object object) { + JsonNode node = mapper.convertValue(object, JsonNode.class); + return mapper.convertValue(node, Map.class); + } + + private String convertObjectToJsonString(Object object) { + try { + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private Map convertJsonStringToMap(String eventAsJson) { + try { + return mapper.readValue(eventAsJson, new TypeReference(){}); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEventTests.java b/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEventTests.java new file mode 100644 index 00000000..debdfe79 --- /dev/null +++ b/store-service/src/test/java/com/siriusxi/ms/store/pcs/IsSameEventTests.java @@ -0,0 +1,38 @@ +package com.siriusxi.ms.store.pcs; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.siriusxi.ms.store.api.core.product.dto.Product; +import com.siriusxi.ms.store.api.event.Event; +import org.junit.jupiter.api.Test; + +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static com.siriusxi.ms.store.pcs.IsSameEvent.sameEventExceptCreatedAt; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +class IsSameEventTests { + + ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testEventObjectCompare() throws JsonProcessingException { + + /* + Event #1 and #2 are the same event, but occurs as different times + Event #3 and #4 are different events + */ + Event event1 = new Event<>(CREATE, 1, new Product(1, "name", 1, null)); + Event event2 = new Event<>(CREATE, 1, new Product(1, "name", 1, null)); + Event event3 = new Event<>(DELETE, 1, null); + Event event4 = new Event<>(CREATE, 1, new Product(2, "name", 1, null)); + + String event1JSon = mapper.writeValueAsString(event1); + + assertThat(event1JSon, is(sameEventExceptCreatedAt(event2))); + assertThat(event1JSon, not(sameEventExceptCreatedAt(event3))); + assertThat(event1JSon, not(sameEventExceptCreatedAt(event4))); + } +} diff --git a/store-service/src/test/java/com/siriusxi/ms/store/pcs/MessagingTests.java b/store-service/src/test/java/com/siriusxi/ms/store/pcs/MessagingTests.java new file mode 100644 index 00000000..90a279f9 --- /dev/null +++ b/store-service/src/test/java/com/siriusxi/ms/store/pcs/MessagingTests.java @@ -0,0 +1,186 @@ +package com.siriusxi.ms.store.pcs; + +import com.siriusxi.ms.store.api.composite.dto.ProductAggregate; +import com.siriusxi.ms.store.api.composite.dto.RecommendationSummary; +import com.siriusxi.ms.store.api.composite.dto.ReviewSummary; +import com.siriusxi.ms.store.api.core.product.dto.Product; +import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; +import com.siriusxi.ms.store.api.core.review.dto.Review; +import com.siriusxi.ms.store.api.event.Event; +import com.siriusxi.ms.store.pcs.integration.StoreIntegration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Mono; + +import java.util.concurrent.BlockingQueue; + +import static com.siriusxi.ms.store.api.event.Event.Type.CREATE; +import static com.siriusxi.ms.store.api.event.Event.Type.DELETE; +import static com.siriusxi.ms.store.pcs.IsSameEvent.sameEventExceptCreatedAt; +import static java.lang.String.valueOf; +import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; +import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat; +import static org.springframework.http.HttpStatus.OK; + +@SpringBootTest(webEnvironment = RANDOM_PORT) +class MessagingTests { + + public static final String BASE_URL = "/store/api/v1/products/"; + + BlockingQueue> queueProducts = null; + BlockingQueue> queueRecommendations = null; + BlockingQueue> queueReviews = null; + + @Autowired private WebTestClient client; + @Autowired private StoreIntegration.MessageSources channels; + @Autowired private MessageCollector collector; + + @BeforeEach + public void setUp() { + queueProducts = getQueue(channels.outputProducts()); + queueRecommendations = getQueue(channels.outputRecommendations()); + queueReviews = getQueue(channels.outputReviews()); + } + + @Test + public void createCompositeProduct1() { + + ProductAggregate composite = new ProductAggregate(1, "name", 1, null, null, null); + postAndVerifyProduct(composite); + + // Assert one expected new product events queued up + assertEquals(1, queueProducts.size()); + + Event expectedEvent = + new Event<>( + CREATE, + composite.getProductId(), + new Product( + composite.getProductId(), composite.getName(), composite.getWeight(), null)); + assertThat(queueProducts, is(receivesPayloadThat(sameEventExceptCreatedAt(expectedEvent)))); + + // Assert none recommendations and review events + assertEquals(0, queueRecommendations.size()); + assertEquals(0, queueReviews.size()); + } + + @Test + public void createCompositeProduct2() { + + ProductAggregate composite = + new ProductAggregate( + 1, + "name", + 1, + singletonList(new RecommendationSummary(1, "a", 1, "c")), + singletonList(new ReviewSummary(1, "a", "s", "c")), + null); + + postAndVerifyProduct(composite); + + // Assert one create product event queued up + assertEquals(1, queueProducts.size()); + + Event expectedProductEvent = + new Event<>( + CREATE, + composite.getProductId(), + new Product( + composite.getProductId(), composite.getName(), composite.getWeight(), null)); + assertThat(queueProducts, receivesPayloadThat(sameEventExceptCreatedAt(expectedProductEvent))); + + // Assert one create recommendation event queued up + assertEquals(1, queueRecommendations.size()); + + RecommendationSummary rec = composite.getRecommendations().get(0); + Event expectedRecommendationEvent = + new Event<>( + CREATE, + composite.getProductId(), + new Recommendation( + composite.getProductId(), + rec.getRecommendationId(), + rec.getAuthor(), + rec.getRate(), + rec.getContent(), + null)); + assertThat( + queueRecommendations, + receivesPayloadThat(sameEventExceptCreatedAt(expectedRecommendationEvent))); + + // Assert one create review event queued up + assertEquals(1, queueReviews.size()); + + ReviewSummary rev = composite.getReviews().get(0); + Event expectedReviewEvent = + new Event<>( + CREATE, + composite.getProductId(), + new Review( + composite.getProductId(), + rev.getReviewId(), + rev.getAuthor(), + rev.getSubject(), + rev.getContent(), + null)); + + assertThat(queueReviews, receivesPayloadThat(sameEventExceptCreatedAt(expectedReviewEvent))); + } + + @Test + public void deleteCompositeProduct() { + + deleteAndVerifyProduct(1); + + // Assert one delete product event queued up + assertEquals(1, queueProducts.size()); + + Event expectedEvent = new Event<>(DELETE, 1, null); + + assertThat(queueProducts, is(receivesPayloadThat(sameEventExceptCreatedAt(expectedEvent)))); + + // Assert one delete recommendation event queued up + assertEquals(1, queueRecommendations.size()); + + Event expectedRecommendationEvent = new Event<>(DELETE, 1, null); + assertThat( + queueRecommendations, + receivesPayloadThat(sameEventExceptCreatedAt(expectedRecommendationEvent))); + + // Assert one delete review event queued up + assertEquals(1, queueReviews.size()); + + Event expectedReviewEvent = new Event<>(DELETE, 1, null); + assertThat(queueReviews, receivesPayloadThat(sameEventExceptCreatedAt(expectedReviewEvent))); + } + + private BlockingQueue> getQueue(MessageChannel messageChannel) { + return collector.forChannel(messageChannel); + } + + private void postAndVerifyProduct(ProductAggregate compositeProduct) { + client + .post() + .uri(BASE_URL) + .body(Mono.just(compositeProduct), ProductAggregate.class) + .exchange() + .expectStatus().isEqualTo(OK); + } + + private void deleteAndVerifyProduct(int productId) { + client.delete() + .uri(BASE_URL.concat(valueOf(productId))) + .exchange() + .expectStatus().isEqualTo(OK); + } +} diff --git a/store-service/src/test/java/com/siriusxi/ms/store/pcs/ReactorTests.java b/store-service/src/test/java/com/siriusxi/ms/store/pcs/ReactorTests.java new file mode 100644 index 00000000..65d3ceb2 --- /dev/null +++ b/store-service/src/test/java/com/siriusxi/ms/store/pcs/ReactorTests.java @@ -0,0 +1,39 @@ +package com.siriusxi.ms.store.pcs; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class ReactorTests { + + @Test + public void TestFlux() { + + List list = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .filter(n -> n % 2 == 0) + .map(n -> n * 2) + .log() + .subscribe(list::add); + + assertThat(list).containsExactly(4, 8); + } + + @Test + public void TestFluxBlocking() { + + List list = Flux.just(1, 2, 3, 4) + .filter(n -> n % 2 == 0) + .map(n -> n * 2) + .log() + .collectList().block(); + + assertThat(list).containsExactly(4, 8); + } + +} diff --git a/store-service/src/test/java/com/siriusxi/ms/store/pcs/StoreServiceApplicationTests.java b/store-service/src/test/java/com/siriusxi/ms/store/pcs/StoreServiceApplicationTests.java index 4eec94b6..ed55d7bb 100644 --- a/store-service/src/test/java/com/siriusxi/ms/store/pcs/StoreServiceApplicationTests.java +++ b/store-service/src/test/java/com/siriusxi/ms/store/pcs/StoreServiceApplicationTests.java @@ -1,8 +1,5 @@ package com.siriusxi.ms.store.pcs; -import com.siriusxi.ms.store.api.composite.dto.ProductAggregate; -import com.siriusxi.ms.store.api.composite.dto.RecommendationSummary; -import com.siriusxi.ms.store.api.composite.dto.ReviewSummary; import com.siriusxi.ms.store.api.core.product.dto.Product; import com.siriusxi.ms.store.api.core.recommendation.dto.Recommendation; import com.siriusxi.ms.store.api.core.review.dto.Review; @@ -17,6 +14,7 @@ import org.springframework.http.HttpStatus; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient.BodyContentSpec; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static java.util.Collections.singletonList; @@ -33,27 +31,27 @@ class StoreServiceApplicationTests { private static final int PRODUCT_ID_NOT_FOUND = 2; private static final int PRODUCT_ID_INVALID = 3; - @Autowired - private WebTestClient client; + @Autowired private WebTestClient client; - @MockBean - private StoreIntegration storeIntegration; + @MockBean private StoreIntegration storeIntegration; @BeforeEach void setUp() { when(storeIntegration.getProduct(PRODUCT_ID_OK)) - .thenReturn(new Product(PRODUCT_ID_OK, "name", 1, "mock-address")); + .thenReturn(Mono.just(new Product(PRODUCT_ID_OK, "name", 1, "mock-address"))); when(storeIntegration.getRecommendations(PRODUCT_ID_OK)) .thenReturn( - singletonList( - new Recommendation(PRODUCT_ID_OK, 1, "author", 1, "content", "mock address"))); + Flux.fromIterable( + singletonList( + new Recommendation(PRODUCT_ID_OK, 1, "author", 1, "content", "mock address")))); when(storeIntegration.getReviews(PRODUCT_ID_OK)) .thenReturn( - singletonList( - new Review(PRODUCT_ID_OK, 1, "author", "subject", "content", "mock address"))); + Flux.fromIterable( + singletonList( + new Review(PRODUCT_ID_OK, 1, "author", "subject", "content", "mock address")))); when(storeIntegration.getProduct(PRODUCT_ID_NOT_FOUND)) .thenThrow(new NotFoundException("NOT FOUND: " + PRODUCT_ID_NOT_FOUND)); @@ -62,45 +60,6 @@ void setUp() { .thenThrow(new InvalidInputException("INVALID: " + PRODUCT_ID_INVALID)); } - @Test - public void createCompositeProduct1() { - - var compositeProduct = new ProductAggregate(1, "name", 1, null, null, null); - - postAndVerifyProductIsCreated(compositeProduct); - } - - @Test - public void createCompositeProduct2() { - var compositeProduct = - new ProductAggregate( - 1, - "name", - 1, - singletonList(new RecommendationSummary(1, "a", 1, "c")), - singletonList(new ReviewSummary(1, "a", "s", "c")), - null); - - postAndVerifyProductIsCreated(compositeProduct); - } - - @Test - public void deleteCompositeProduct() { - var compositeProduct = - new ProductAggregate( - 1, - "name", - 1, - singletonList(new RecommendationSummary(1, "a", 1, "c")), - singletonList(new ReviewSummary(1, "a", "s", "c")), - null); - - postAndVerifyProductIsCreated(compositeProduct); - - deleteAndVerifyProductIsDeleted(compositeProduct.getProductId()); - deleteAndVerifyProductIsDeleted(compositeProduct.getProductId()); - } - @Test public void getProductById() { @@ -145,18 +104,4 @@ private BodyContentSpec getAndVerifyProduct(int productId, HttpStatus expectedSt .contentType(APPLICATION_JSON) .expectBody(); } - - private void postAndVerifyProductIsCreated(ProductAggregate compositeProduct) { - client - .post() - .uri(BASE_URL) - .body(Mono.just(compositeProduct), ProductAggregate.class) - .exchange() - .expectStatus() - .isEqualTo(OK); - } - - private void deleteAndVerifyProductIsDeleted(int productId) { - client.delete().uri(BASE_URL + productId).exchange().expectStatus().isEqualTo(OK); - } } diff --git a/store-utils/src/main/java/com/siriusxi/ms/store/util/exceptions/EventProcessingException.java b/store-utils/src/main/java/com/siriusxi/ms/store/util/exceptions/EventProcessingException.java new file mode 100644 index 00000000..a958ba95 --- /dev/null +++ b/store-utils/src/main/java/com/siriusxi/ms/store/util/exceptions/EventProcessingException.java @@ -0,0 +1,18 @@ +package com.siriusxi.ms.store.util.exceptions; + +public class EventProcessingException extends RuntimeException { + public EventProcessingException() { + } + + public EventProcessingException(String message) { + super(message); + } + + public EventProcessingException(String message, Throwable cause) { + super(message, cause); + } + + public EventProcessingException(Throwable cause) { + super(cause); + } +} \ No newline at end of file diff --git a/test-em-all.sh b/test-em-all.sh index d064668f..9e02cb3a 100644 --- a/test-em-all.sh +++ b/test-em-all.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash ## Author: Mohamed Taman -## version: v3.0 +## version: v4.0 ### Sample usage: # # for local run @@ -8,12 +8,17 @@ # with docker compose # HOST=localhost PORT=8080 ./test-em-all.bash start stop # -echo -e "Starting [Springy Store] full functionality testing....\n" +echo -e "Starting [Springy Store] full functionality [Blackbox] testing....\n" : ${HOST=localhost} : ${PORT=8080} +: ${PROD_ID_REVS_RECS=2} +: ${PROD_ID_NOT_FOUND=14} +: ${PROD_ID_NO_RECS=114} +: ${PROD_ID_NO_REVS=214} BASE_URL="/store/api/v1/products" + function assertCurl() { local expectedHttpCode=$1 @@ -30,11 +35,12 @@ function assertCurl() { else echo "Test OK (HTTP Code: $httpCode, $RESPONSE)" fi + return 0 else echo "Test FAILED, EXPECTED HTTP Code: $expectedHttpCode, GOT: $httpCode, WILL ABORT!" echo "- Failing command: $curlCmd" echo "- Response Body: $RESPONSE" - exit 1 + return 1 fi } @@ -46,9 +52,10 @@ function assertEqual() { if [[ "$actual" = "$expected" ]] then echo "Test OK (actual value: $actual)" + return 0 else echo "Test FAILED, EXPECTED VALUE: $expected, ACTUAL VALUE: $actual, WILL ABORT" - exit 1 + return 1 fi } @@ -82,7 +89,52 @@ function waitForService() { done } -function createProduct() { +function testCompositeCreated() { + + # Expect that the Product Composite for productId $PROD_ID_REVS_RECS + # has been created with three recommendations and three reviews + if ! assertCurl 200 "curl http://${HOST}:${PORT}${BASE_URL}/${PROD_ID_REVS_RECS} -s" + then + echo -n "FAIL" + return 1 + fi + + set +e + assertEqual "$PROD_ID_REVS_RECS" $(echo ${RESPONSE} | jq .productId) + if [[ "$?" -eq "1" ]] ; then return 1; fi + + assertEqual 3 $(echo ${RESPONSE} | jq ".recommendations | length") + if [[ "$?" -eq "1" ]] ; then return 1; fi + + assertEqual 3 $(echo ${RESPONSE} | jq ".reviews | length") + if [[ "$?" -eq "1" ]] ; then return 1; fi + + set -e +} + +function waitForMessageProcessing() { + echo "Wait for messages to be processed... " + + # Give background processing some time to complete... + sleep 1 + + n=0 + until testCompositeCreated + do + n=$((n + 1)) + if [[ ${n} == 40 ]] + then + echo " Give up" + exit 1 + else + sleep 6 + echo -n ", retry #$n " + fi + done + echo "All messages are now processed!" +} + +function recreateComposite() { local productId=$1 local composite=$2 @@ -92,38 +144,43 @@ function createProduct() { function setupTestData() { - body=\ -'{"productId":1,"name":"product 1","weight":1, "recommendations":[ - {"recommendationId":1,"author":"author 1","rate":1,"content":"content 1"}, - {"recommendationId":2,"author":"author 2","rate":2,"content":"content 2"}, - {"recommendationId":3,"author":"author 3","rate":3,"content":"content 3"} - ], "reviews":[ - {"reviewId":1,"author":"author 1","subject":"subject 1","content":"content 1"}, - {"reviewId":2,"author":"author 2","subject":"subject 2","content":"content 2"}, - {"reviewId":3,"author":"author 3","subject":"subject 3","content":"content 3"} - ]}' - createProduct 1 "$body" - - body=\ -'{"productId":113,"name":"product 113","weight":113, "reviews":[ + body="{\"productId\":$PROD_ID_NO_RECS" + body+=\ +',"name":"product name A","weight":100, "reviews":[ {"reviewId":1,"author":"author 1","subject":"subject 1","content":"content 1"}, {"reviewId":2,"author":"author 2","subject":"subject 2","content":"content 2"}, {"reviewId":3,"author":"author 3","subject":"subject 3","content":"content 3"} ]}' - createProduct 113 "$body" + recreateComposite "$PROD_ID_NO_RECS" "$body" - body=\ -'{"productId":213,"name":"product 213","weight":213, "recommendations":[ + body="{\"productId\":$PROD_ID_NO_REVS" + body+=\ +',"name":"product name B","weight":200, "recommendations":[ {"recommendationId":1,"author":"author 1","rate":1,"content":"content 1"}, {"recommendationId":2,"author":"author 2","rate":2,"content":"content 2"}, {"recommendationId":3,"author":"author 3","rate":3,"content":"content 3"} ]}' - createProduct 213 "$body" + recreateComposite "$PROD_ID_NO_REVS" "$body" + + + body="{\"productId\":$PROD_ID_REVS_RECS" + body+=\ +',"name":"product name C","weight":300, "recommendations":[ + {"recommendationId":1,"author":"author 1","rate":1,"content":"content 1"}, + {"recommendationId":2,"author":"author 2","rate":2,"content":"content 2"}, + {"recommendationId":3,"author":"author 3","rate":3,"content":"content 3"} + ], "reviews":[ + {"reviewId":1,"author":"author 1","subject":"subject 1","content":"content 1"}, + {"reviewId":2,"author":"author 2","subject":"subject 2","content":"content 2"}, + {"reviewId":3,"author":"author 3","subject":"subject 3","content":"content 3"} + ]}' + + recreateComposite 1 "$body" } set -e -echo "Start:" `date` +echo "Start Tests:" `date` echo "HOST=${HOST}" echo "PORT=${PORT}" @@ -131,34 +188,36 @@ echo "PORT=${PORT}" if [[ $@ == *"start"* ]] then echo "Restarting the test environment..." - echo "$ docker-compose down" - docker-compose down + echo "$ docker-compose -p ssm down --remove-orphans" + docker-compose -p ssm down --remove-orphans echo "$ docker-compose -p ssm up -d" docker-compose -p ssm up -d fi -waitForService curl -X DELETE http://${HOST}:${PORT}${BASE_URL}/13 +waitForService curl http://${HOST}:${PORT}/actuator/health setupTestData +waitForMessageProcessing + # Verify that a normal request works, expect three recommendations and three reviews -assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/1 -s" -assertEqual 1 $(echo ${RESPONSE} | jq .productId) +assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/$PROD_ID_REVS_RECS -s" +assertEqual ${PROD_ID_REVS_RECS} $(echo ${RESPONSE} | jq .productId) assertEqual 3 $(echo ${RESPONSE} | jq ".recommendations | length") assertEqual 3 $(echo ${RESPONSE} | jq ".reviews | length") # Verify that a 404 (Not Found) error is returned for a non existing productId (13) -assertCurl 404 "curl http://$HOST:$PORT${BASE_URL}/13 -s" +assertCurl 404 "curl http://$HOST:$PORT${BASE_URL}/$PROD_ID_NOT_FOUND -s" # Verify that no recommendations are returned for productId 113 -assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/113 -s" -assertEqual 113 $(echo ${RESPONSE} | jq .productId) +assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/$PROD_ID_NO_RECS -s" +assertEqual ${PROD_ID_NO_RECS} $(echo ${RESPONSE} | jq .productId) assertEqual 0 $(echo ${RESPONSE} | jq ".recommendations | length") assertEqual 3 $(echo ${RESPONSE} | jq ".reviews | length") # Verify that no reviews are returned for productId 213 -assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/213 -s" -assertEqual 213 $(echo ${RESPONSE} | jq .productId) +assertCurl 200 "curl http://$HOST:$PORT${BASE_URL}/$PROD_ID_NO_REVS -s" +assertEqual ${PROD_ID_NO_REVS} $(echo ${RESPONSE} | jq .productId) assertEqual 3 $(echo ${RESPONSE} | jq ".recommendations | length") assertEqual 0 $(echo ${RESPONSE} | jq ".reviews | length") @@ -170,11 +229,11 @@ assertEqual "\"Invalid productId: -1\"" "$(echo ${RESPONSE} | jq .message)" assertCurl 400 "curl http://$HOST:$PORT${BASE_URL}/invalidProductId -s" assertEqual "\"Type mismatch.\"" "$(echo ${RESPONSE} | jq .message)" +echo "End, all tests OK:" `date` + if [[ $@ == *"stop"* ]] then echo "We are done, stopping the test environment..." - echo "$ docker-compose down" - docker-compose -p ssm down -fi - -echo "End:" `date` \ No newline at end of file + echo "$ docker-compose down --remove-orphans" + docker-compose -p ssm down --remove-orphans +fi \ No newline at end of file