diff --git a/cache-invalidation-2/pom.xml b/cache-invalidation-2/pom.xml new file mode 100644 index 00000000..6d476ee9 --- /dev/null +++ b/cache-invalidation-2/pom.xml @@ -0,0 +1,211 @@ + + + 4.0.0 + org.acme + code-with-quarkus + 1.0.0-SNAPSHOT + + + 3.13.0 + 21 + UTF-8 + UTF-8 + quarkus-bom + io.quarkus.platform + 3.15.1 + true + 3.3.1 + 15.0.8.Final + 2.7.3.Final + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + + + + + io.quarkus + quarkus-rest + + + io.quarkus + quarkus-hibernate-orm + + + io.quarkus + quarkus-rest-jackson + + + io.quarkus + quarkus-jdbc-postgresql + + + io.quarkus + quarkus-arc + + + io.debezium + debezium-embedded + ${version.debezium} + + + io.debezium + debezium-connector-postgres + ${version.debezium} + + + org.testcontainers + testcontainers + + + org.infinispan + infinispan-core + 15.0.8.Final + provided + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + native-image-agent + + + + + + maven-compiler-plugin + ${compiler-plugin.version} + + true + + + + maven-surefire-plugin + ${surefire-plugin.version} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + maven-failsafe-plugin + ${surefire-plugin.version} + + + + integration-test + verify + + + + + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + + + + io.fabric8 + docker-maven-plugin + 0.43.4 + + 500 + default + true + + + + debezium/postgres-server-test-database + + none + + postgresuser + postgrespw + inventory + -E UTF8 + en_US.utf8 + + + 5432:5432 + + + postgres + true + green + + + + (?s)PostgreSQL init process complete.*database system is ready to accept connections + + + + quay.io/debezium/example-postgres:latest + + ln -fs /usr/share/zoneinfo/US/Samoa /etc/localtime && echo timezone=US/Samoa >> /usr/share/postgresql/postgresql.conf.sample + + + + properties + override + + + + + + + + + + + native + + + native + + + + false + true + + + + diff --git a/cache-invalidation-2/resources/data/create-order-request.json b/cache-invalidation-2/resources/data/create-order-request.json new file mode 100644 index 00000000..b0313100 --- /dev/null +++ b/cache-invalidation-2/resources/data/create-order-request.json @@ -0,0 +1,5 @@ +{ + "customer" : "Billy-Bob", + "itemId" : 10003, + "quantity" : 2 +} diff --git a/cache-invalidation-2/resources/data/update-item-request.json b/cache-invalidation-2/resources/data/update-item-request.json new file mode 100644 index 00000000..67a17118 --- /dev/null +++ b/cache-invalidation-2/resources/data/update-item-request.json @@ -0,0 +1,4 @@ +{ + "description" : "North by Northwest - Director's Cut", + "price" : 17.99 +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java new file mode 100644 index 00000000..21630498 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java @@ -0,0 +1,60 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.model; + +import java.math.BigDecimal; + +import jakarta.persistence.Cacheable; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; + +@Entity +@Cacheable +public class Item { + + @Id + private long id; + private String description; + private BigDecimal price; + + public Item() { + } + + public Item(long id, String description, BigDecimal price) { + this.id = id; + this.description = description; + this.price = price; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public BigDecimal getPrice() { + return price; + } + + public void setPrice(BigDecimal price) { + this.price = price; + } + + @Override + public String toString() { + return "Item [id=" + id + ", description=" + description + ", price=" + price + "]"; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java new file mode 100644 index 00000000..4bcea079 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java @@ -0,0 +1,87 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.model; + +import java.math.BigDecimal; + +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; +import jakarta.persistence.ManyToOne; +import jakarta.persistence.SequenceGenerator; + +@Entity +public class PurchaseOrder { + + @Id + @GeneratedValue(generator = "sequence") + @SequenceGenerator( + name = "sequence", + sequenceName = "seq_po", + initialValue = 1001, + allocationSize = 50 + ) + private long id; + + private String customer; + + @ManyToOne + private Item item; + + private int quantity; + + private BigDecimal totalPrice; + + public PurchaseOrder() { + } + + public PurchaseOrder(String customer, Item item, int quantity, BigDecimal totalPrice) { + this.customer = customer; + this.item = item; + this.quantity = quantity; + this.totalPrice = totalPrice; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getCustomer() { + return customer; + } + + public void setCustomer(String customer) { + this.customer = customer; + } + + public Item getItem() { + return item; + } + + public void setItem(Item item) { + this.item = item; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + public BigDecimal getTotalPrice() { + return totalPrice; + } + + public void setTotalPrice(BigDecimal totalPrice) { + this.totalPrice = totalPrice; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java new file mode 100644 index 00000000..f4efee48 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java @@ -0,0 +1,128 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.persistence; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; + +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.PersistenceContext; + +import jakarta.persistence.PersistenceUnit; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.ConfigValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.data.Envelope.Operation; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.RecordChangeEvent; +import io.debezium.engine.format.ChangeEventFormat; +import io.debezium.examples.cacheinvalidation.model.Item; + +/** + * Listens to database changes using Debezium's embedded engine. If a change + * event for an {@link Item} arrives that has not been caused by this + * application itself, that {@code Item} will be removed from the JPA 2nd-level + * cache. + * + * @author Gunnar Morling + */ +@ApplicationScoped +public class DatabaseChangeEventListener { + + private static final String CONFIG_PREFIX = "quarkus.debezium-cdc."; + private static final Logger LOG = LoggerFactory.getLogger(DatabaseChangeEventListener.class); + + @PersistenceUnit + private EntityManagerFactory emf; + + @PersistenceContext + private EntityManager em; + + @Inject + private KnownTransactions knownTransactions; + + private DebeziumEngine engine; + private ExecutorService executorService; + + @Priority(2) + public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) { + LOG.info("Launching Debezium embedded engine"); + + final Properties properties = new Properties(); + for (String propertyName : ConfigProvider.getConfig().getPropertyNames()) { + if (propertyName.startsWith(CONFIG_PREFIX)) { + final String key = propertyName.replace(CONFIG_PREFIX, ""); + final ConfigValue value = ConfigProvider.getConfig().getConfigValue(propertyName); + properties.put(key, value.getRawValue()); + LOG.info("\t{}: {}", key, value.getRawValue()); + } + } + + final Configuration config = Configuration.empty() + .withSystemProperties(Function.identity()) + .edit() + .with(Configuration.from(properties)) + .build(); + + this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) + .using(config.asProperties()) + .notifying((list, recordCommitter) -> { + for (RecordChangeEvent record : list) { + handleDbChangeEvent(record.record()); + recordCommitter.markProcessed(record); + } + recordCommitter.markBatchFinished(); + }) + .build(); + + executorService = Executors.newFixedThreadPool(1); + executorService.execute(engine); + } + + @PreDestroy + public void shutdownEngine() throws Exception { + LOG.info("Stopping Debezium embedded engine"); + engine.close(); + executorService.shutdown(); + } + + private void handleDbChangeEvent(SourceRecord record) { + LOG.info("Handling DB change event " + record); + + if (record.topic().equals("dbserver1.public.item")) { + Long itemId = ((Struct) record.key()).getInt64("id"); + Struct payload = (Struct) record.value(); + Operation op = Operation.forCode(payload.getString("op")); + Long txId = ((Struct) payload.get("source")).getInt64("txId"); + + if (knownTransactions.isKnown(txId)) { + LOG.info("Not evicting item {} from 2nd-level cache as TX {} was started by this application", itemId, txId); + } + else if (op != Operation.UPDATE && op != Operation.DELETE) { + LOG.info("Not evicting item {} from 2nd-level cache as the change is neither an UPDATE nor a DELETE", itemId); + } + else { + LOG.info("Evicting item {} from 2nd-level cache as TX {} was not started by this application", itemId, txId); + emf.getCache().evict(Item.class, itemId); + } + } + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java new file mode 100644 index 00000000..bf77f805 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java @@ -0,0 +1,54 @@ +package io.debezium.examples.cacheinvalidation.persistence; + +import java.util.concurrent.TimeUnit; + +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; + +import org.infinispan.Cache; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Keeps track of transactions initiated by this application, so we can tell + * them apart from transactions initiated externally, e.g. via other DB clients. + */ +@ApplicationScoped +public class KnownTransactions { + + private static final Logger LOG = LoggerFactory.getLogger( KnownTransactions.class ); + + private final DefaultCacheManager cacheManager; + private final Cache applicationTransactions; + + public KnownTransactions() { + cacheManager = new DefaultCacheManager(); + cacheManager.defineConfiguration( + "tx-id-cache", + new ConfigurationBuilder() + .simpleCache(true) + .expiration() + .lifespan(60, TimeUnit.SECONDS) + .build() + ); + + applicationTransactions = cacheManager.getCache("tx-id-cache"); + } + + @PreDestroy + public void stopCacheManager() { + cacheManager.stop(); + } + + public void register(long txId) { + LOG.info("Registering TX {} started by this application", txId); + applicationTransactions.put(txId, true); + } + + public boolean isKnown(long txId) { + return Boolean.TRUE.equals(applicationTransactions.get(txId)); + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java new file mode 100644 index 00000000..2bf9cdac --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java @@ -0,0 +1,39 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.persistence; + +import org.hibernate.boot.Metadata; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.event.service.spi.EventListenerRegistry; +import org.hibernate.event.spi.EventType; +import org.hibernate.integrator.spi.Integrator; +import org.hibernate.service.spi.SessionFactoryServiceRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integrator for Hibernate ORM that registers the {@link TransactionRegistrationListener}. + * + * @author Gunnar Morling + */ +public class TransactionRegistrationIntegrator implements Integrator { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionRegistrationIntegrator.class); + + + @Override + public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory, + SessionFactoryServiceRegistry serviceRegistry) { + LOG.info("TransactionRegistrationIntegrator#integrate()"); + + serviceRegistry.getService(EventListenerRegistry.class) + .appendListeners(EventType.FLUSH, new TransactionRegistrationListener()); + } + + @Override + public void disintegrate(SessionFactoryImplementor sessionFactory, SessionFactoryServiceRegistry serviceRegistry) { + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java new file mode 100644 index 00000000..fd1e8d31 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.persistence; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import jakarta.enterprise.inject.spi.CDI; + +import org.hibernate.FlushMode; +import org.hibernate.HibernateException; +import org.hibernate.Session; +import org.hibernate.event.spi.FlushEvent; +import org.hibernate.event.spi.FlushEventListener; + +/** + * Hibernate event listener obtains the current TX id and stores it in a cache. + * + * @author Gunnar Morling + */ +class TransactionRegistrationListener implements FlushEventListener { + + private final ConcurrentMap sessionsWithBeforeTransactionCompletion; + + private volatile KnownTransactions knownTransactions; + + public TransactionRegistrationListener() { + sessionsWithBeforeTransactionCompletion = new ConcurrentHashMap<>(); + } + + @Override + public void onFlush(FlushEvent event) throws HibernateException { + if (sessionsWithBeforeTransactionCompletion.containsKey(event.getSession())) { + return; + } + + sessionsWithBeforeTransactionCompletion.put(event.getSession(), true); + + event.getSession().getActionQueue().registerProcess(session -> { + Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()") + .setHibernateFlushMode(FlushMode.MANUAL) + .getSingleResult(); + + getKnownTransactions().register(txId.longValue()); + + sessionsWithBeforeTransactionCompletion.remove(session); + }); + } + + private KnownTransactions getKnownTransactions() { + KnownTransactions value = knownTransactions; + + if (value == null) { + knownTransactions = value = CDI.current().select(KnownTransactions.class).get(); + } + + return value; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java new file mode 100644 index 00000000..3040c208 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java @@ -0,0 +1,41 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.PersistenceContext; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import io.debezium.examples.cacheinvalidation.model.Item; + +@Path("/cache") +@ApplicationScoped +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +public class CacheResource { + + @PersistenceContext + private EntityManagerFactory entityManagerFactory; + + @DELETE + @Path("/item/{id}") + public void invalidateItemCacheEntry(@PathParam("id") long itemId) { + entityManagerFactory.getCache().evict(Item.class, itemId); + } + + @GET + @Path("/item/{id}") + public boolean isContained(@PathParam("id") long itemId) { + return entityManagerFactory.getCache().contains(Item.class, itemId); + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderRequest.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderRequest.java new file mode 100644 index 00000000..fa647b9e --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderRequest.java @@ -0,0 +1,37 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +public class CreateOrderRequest { + + private String customer; + private long itemId; + private int quantity; + + public String getCustomer() { + return customer; + } + + public void setCustomer(String customer) { + this.customer = customer; + } + + public long getItemId() { + return itemId; + } + + public void setItemId(long itemId) { + this.itemId = itemId; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderResponse.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderResponse.java new file mode 100644 index 00000000..d054ec0d --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/CreateOrderResponse.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import java.math.BigDecimal; + +import io.debezium.examples.cacheinvalidation.model.Item; + +public class CreateOrderResponse { + + private long id; + private String customer; + private Item item; + private int quantity; + private BigDecimal totalPrice; + + public CreateOrderResponse(long id, String customer, Item item, int quantity, BigDecimal totalPrice) { + this.id = id; + this.customer = customer; + this.item = item; + this.quantity = quantity; + this.totalPrice = totalPrice; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getCustomer() { + return customer; + } + + public void setCustomer(String customer) { + this.customer = customer; + } + + public Item getItem() { + return item; + } + + public void setItem(Item item) { + this.item = item; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + + public BigDecimal getTotalPrice() { + return totalPrice; + } + + public void setTotalPrice(BigDecimal totalPrice) { + this.totalPrice = totalPrice; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java new file mode 100644 index 00000000..8880e366 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import jakarta.transaction.Transactional; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import io.debezium.examples.cacheinvalidation.model.Item; + +@Path("/items") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +@ApplicationScoped +public class ItemResource { + + @PersistenceContext + private EntityManager entityManager; + + @GET + @Path("/{id}") + public Response getItem(@PathParam("id") long id) { + Item item = entityManager.find(Item.class, id); + return Response.ok(item).build(); + } + + @PUT + @Transactional + @Path("/{id}") + public UpdateItemResponse addOrder(@PathParam("id") long id, UpdateItemRequest request) { + Item item = entityManager.find(Item.class, id); + + if (item == null) { + throw new NotFoundException("Item with id " + id + " doesn't exist"); + } + + UpdateItemResponse response = new UpdateItemResponse(); + response.setId(id); + response.setOldDescription(item.getDescription()); + response.setOldPrice(item.getPrice()); + response.setNewDescription(request.getDescription()); + response.setNewPrice(request.getPrice()); + + item.setDescription(request.getDescription()); + item.setPrice(request.getPrice()); + + return response; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java new file mode 100644 index 00000000..70f402ef --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java @@ -0,0 +1,53 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import java.math.BigDecimal; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import jakarta.transaction.Transactional; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import io.debezium.examples.cacheinvalidation.model.Item; +import io.debezium.examples.cacheinvalidation.model.PurchaseOrder; + +@Path("/orders") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +@ApplicationScoped +public class OrderResource { + + @PersistenceContext + private EntityManager entityManager; + + @POST + @Transactional + public CreateOrderResponse addOrder(CreateOrderRequest orderRequest) { + Item item = entityManager.find(Item.class, orderRequest.getItemId()); + PurchaseOrder po = new PurchaseOrder( + orderRequest.getCustomer(), + item, + orderRequest.getQuantity(), + item.getPrice().multiply(BigDecimal.valueOf(orderRequest.getQuantity())) + ); + + po = entityManager.merge(po); + + return new CreateOrderResponse( + po.getId(), + po.getCustomer(), + po.getItem(), + po.getQuantity(), + po.getTotalPrice() + ); + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemRequest.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemRequest.java new file mode 100644 index 00000000..1cfe06cd --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemRequest.java @@ -0,0 +1,27 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import java.math.BigDecimal; + +public class UpdateItemRequest { + + private String description; + private BigDecimal price; + + public String getDescription() { + return description; + } + public void setDescription(String description) { + this.description = description; + } + public BigDecimal getPrice() { + return price; + } + public void setPrice(BigDecimal price) { + this.price = price; + } +} diff --git a/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemResponse.java b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemResponse.java new file mode 100644 index 00000000..6e9ce209 --- /dev/null +++ b/cache-invalidation-2/src/main/java/io/debezium/examples/cacheinvalidation/rest/UpdateItemResponse.java @@ -0,0 +1,57 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation.rest; + +import java.math.BigDecimal; + +public class UpdateItemResponse { + + private long id; + private String oldDescription; + private BigDecimal oldPrice; + private String newDescription; + private BigDecimal newPrice; + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getOldDescription() { + return oldDescription; + } + + public void setOldDescription(String oldDescription) { + this.oldDescription = oldDescription; + } + + public BigDecimal getOldPrice() { + return oldPrice; + } + + public void setOldPrice(BigDecimal oldPrice) { + this.oldPrice = oldPrice; + } + + public String getNewDescription() { + return newDescription; + } + + public void setNewDescription(String newDescription) { + this.newDescription = newDescription; + } + + public BigDecimal getNewPrice() { + return newPrice; + } + + public void setNewPrice(BigDecimal newPrice) { + this.newPrice = newPrice; + } +} diff --git a/cache-invalidation-2/src/main/resources/META-INF/data.sql b/cache-invalidation-2/src/main/resources/META-INF/data.sql new file mode 100644 index 00000000..aee1c66a --- /dev/null +++ b/cache-invalidation-2/src/main/resources/META-INF/data.sql @@ -0,0 +1,3 @@ +INSERT INTO Item (id,description,price) VALUES (10001, 'The Birds', 9.99); +INSERT INTO Item (id,description,price) VALUES (10002, 'To Catch A Thieve', 12.99); +INSERT INTO Item (id,description,price) VALUES (10003, 'North By Northwest', 14.99); diff --git a/cache-invalidation-2/src/main/resources/META-INF/services/org.hibernate.integrator.spi.Integrator b/cache-invalidation-2/src/main/resources/META-INF/services/org.hibernate.integrator.spi.Integrator new file mode 100644 index 00000000..ca92e370 --- /dev/null +++ b/cache-invalidation-2/src/main/resources/META-INF/services/org.hibernate.integrator.spi.Integrator @@ -0,0 +1 @@ +io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator \ No newline at end of file diff --git a/cache-invalidation-2/src/main/resources/application.properties b/cache-invalidation-2/src/main/resources/application.properties new file mode 100644 index 00000000..98c820e8 --- /dev/null +++ b/cache-invalidation-2/src/main/resources/application.properties @@ -0,0 +1,25 @@ +# Datasource +quarkus.datasource.db-kind=pg +quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/inventory +quarkus.datasource.username=postgresuser +quarkus.datasource.password=postgrespw + +# Hibernate configuration +quarkus.hibernate-orm.log.sql=true +quarkus.hibernate-orm.log.format-sql=true +quarkus.hibernate-orm.sql-load-script=META-INF/data.sql +quarkus.hibernate-orm.database.generation=drop-and-create + +# Debezium CDC configuration +quarkus.debezium-cdc.connector.class=io.debezium.connector.postgresql.PostgresConnector +quarkus.debezium-cdc.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore +quarkus.debezium-cdc.name=cache-invalidation-connector +quarkus.debezium-cdc.database.hostname=localhost +quarkus.debezium-cdc.database.port=5432 +quarkus.debezium-cdc.database.user=postgresuser +quarkus.debezium-cdc.database.password=postgrespw +quarkus.debezium-cdc.database.dbname=inventory +quarkus.debezium-cdc.topic.prefix=dbserver1 +quarkus.debezium-cdc.table.include.list=public.item +quarkus.debezium-cdc.plugin.name=pgoutput +quarkus.debezium-cdc.snapshot.mode=never diff --git a/cache-invalidation-2/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java b/cache-invalidation-2/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java new file mode 100644 index 00000000..534a6f05 --- /dev/null +++ b/cache-invalidation-2/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java @@ -0,0 +1,117 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.examples.cacheinvalidation; + +import static io.restassured.RestAssured.get; +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@QuarkusTest +public class CacheInvalidationIT { + + @ConfigProperty(name = "quarkus.datasource.jdbc.url") + String jdbcUrl; + + @ConfigProperty(name = "quarkus.datasource.username") + String jdbcUserName; + + @ConfigProperty(name = "quarkus.datasource.password") + String jdbcPassword; + + @BeforeEach + public void prepareItem() { + updateItem(10003, "North by Northwest", 14.99F); + } + + @Test + public void shouldInvalidateCacheAfterDatabaseUpdate() throws Exception { + placeOrder(10003, 2, 29.98F); + + // update the item price directly in the DB + try(Connection conn = getDbConnection(); Statement statement = conn.createStatement()) { + statement.executeUpdate("UPDATE public.item SET price = 16.99 WHERE id = 10003;"); + } + + // cache should be invalidated + await().atMost(5, TimeUnit.SECONDS) + .until(() -> { + return !get("/cache/item/10003").as(boolean.class); + }); + + // and the item reloaded from the DB + placeOrder(10003, 2, 33.98F); + } + + @Test + public void shouldNotInvalidateCacheAfterUpdateThroughApplication() throws Exception { + placeOrder(10003, 2, 29.98F); + + // update the item price through application + updateItem(10003, "North by Northwest", 16.99F); + + // Theoretically an (unexpected) CDC event could also arrive after that time, + // but that seems to be as good as it gets + Thread.sleep(3000); + + // cache should not be invalidated + assertTrue(get("/cache/item/10003").as(boolean.class)); + } + + private void placeOrder(long itemId, int quantity, float expectedTotalPrice) { + given() + .contentType(ContentType.JSON) + .body( + "{\n" + + " \"customer\" : \"Billy-Bob\",\n" + + " \"itemId\" : " + itemId + ",\n" + + " \"quantity\" : " + quantity + "\n" + + "}" + ) + .when() + .post("/orders") + .then() + .body("totalPrice", equalTo(expectedTotalPrice)); + } + + private void updateItem(long itemId, String newDescription, float newPrice) { + given() + .contentType(ContentType.JSON) + .body( + "{\n" + + " \"description\" : \"" + newDescription + "\",\n" + + " \"price\" : " + newPrice + "\n" + + "}" + ) + .when() + .put("/items/{id}", itemId) + .then() + .statusCode(200); + } + + private Connection getDbConnection() throws SQLException { + final Properties props = new Properties(); + props.setProperty("user", jdbcUserName); + props.setProperty("password", jdbcPassword); + System.out.println(jdbcUrl); + System.out.println(props.entrySet()); + return DriverManager.getConnection(jdbcUrl, props); + } +} diff --git a/cache-invalidation/Dockerfile b/cache-invalidation/Dockerfile deleted file mode 100644 index a7ae12f8..00000000 --- a/cache-invalidation/Dockerfile +++ /dev/null @@ -1,26 +0,0 @@ -FROM jboss/wildfly:25.0.0.Final - -ADD resources/wildfly/customization /opt/jboss/wildfly/customization/ -RUN mkdir /tmp/pg-driver && cd /tmp/pg-driver && curl -sO https://jdbc.postgresql.org/download/postgresql-42.5.1.jar - -RUN mkdir /tmp/infinispan-modules && \ - cd /tmp/infinispan-modules && \ - curl -sO https://downloads.jboss.org/infinispan/11.0.6.Final/infinispan-wildfly-modules-11.0.6.Final.zip && \ - unzip infinispan-wildfly-modules-11.0.6.Final.zip && \ - cp -r infinispan-wildfly-modules-11.0.6.Final/modules/* /opt/jboss/wildfly/modules && \ - rm infinispan-wildfly-modules-11.0.6.Final.zip && \ - rm -rf infinispan-wildfly-modules-11.0.6.Final - -# Based on: -# https://goldmann.pl/blog/2014/07/23/customizing-the-configuration-of-the-wildfly-docker-image/ -# https://tomylab.wordpress.com/2016/07/24/how-to-add-a-datasource-to-wildfly/ -RUN /opt/jboss/wildfly/customization/execute.sh - -RUN /opt/jboss/wildfly/bin/add-user.sh admin secret - -ADD target/cache-invalidation.war /opt/jboss/wildfly/standalone/deployments/ - -# Fix for Error: Could not rename /opt/jboss/wildfly/standalone/configuration/standalone_xml_history/current -RUN rm -rf /opt/jboss/wildfly/standalone/configuration/standalone_xml_history - -CMD ["/opt/jboss/wildfly/bin/standalone.sh", "-b", "0.0.0.0", "-bmanagement", "0.0.0.0", "--debug"] diff --git a/cache-invalidation/README.md b/cache-invalidation/README.md index d407fc2c..6dee8dcb 100644 --- a/cache-invalidation/README.md +++ b/cache-invalidation/README.md @@ -3,98 +3,109 @@ This demo shows how Debezium can be used to invalidate items in the JPA 2nd level cache after external data changes, e.g. a manual record update in the database, bypassing the application layer. -The application runs on WildFly and uses Postgres as a database. +The application uses Quarkus, Hibernate, and PostgreSQL as a database. The domain model is centered around purchase orders of given items. The `Item` entity is marked as cacheable, i.e. after updates to an item (e.g. its base price), it must be purged from the 2nd-level cache in order to correctly calculate the price of future orders of that item. ## Manual Testing -To run the app, follow these steps: - - export DEBEZIUM_VERSION=2.1 - mvn clean package - docker-compose up --build - -Place an order for item 10003 using curl: - - curl -H "Content-Type: application/json" \ +1. First start the database container. + ```bash + mvn docker:start + ``` + This will start the `quay.io/debezium/example-postgres:latest` container that will be used to store our JPA entities and the database that the Debezium will capture changes from, too. + +2. Run the application using the Quarkus development mode. + ```bash + mvn clean quarkus:dev + ``` + The application will start in the development mode and run until you press `Ctrl+C` to stop the application. + +3. Place an order for item 1003 using curl: + ```bash + curl -H "Content-Type: application/json" \ -X POST \ --data @resources/data/create-order-request.json \ - http://localhost:8080/cache-invalidation/rest/orders - -Or, if [httpie](https://httpie.org/) is your preferred CLI HTTP client: - - cat resources/data/create-order-request.json | http POST http://localhost:8080/cache-invalidation/rest/orders - -Update the price of item 10003 directly in the database: - - docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"' - -Use the application's REST API to verify that the item has been purged from the cache: - - curl -H "Content-Type: application/json" \ + http://localhost:8080/rest/orders + ``` + Or, if [httpie](https://httpie.org/) is your preferred CLI HTTP client: + ```bash + cat resources/data/create-order-request.json | http POST http://localhost:8080/rest/orders + ``` + +4. Update the price of item 10003 directly in the database: + ```bash + docker exec postgres-server-test-database-1 bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"' + ``` + +5. Now use the REST endpoint to verify that the item has been purged from the cache: + ```bash + curl -H "Content-Type: application/json" \ -X GET \ - http://localhost:8080/cache-invalidation/rest/cache/item/10003 - -Or via httpie: - - http GET http://localhost:8080/cache-invalidation/rest/cache/item/10003 - -Place another order of that item and observe how the calculated total price reflects the change applied above. -Also observe in the application's log how the `item` table is queried. - -Now update the item again, using the application's REST API this time: - - curl -H "Content-Type: application/json" \ + http://localhost:8080/rest/cache/item/10003 + ``` + or via httpie: + ```bash + http GET http://localhost:8080/rest/cache/item/10003 + ``` + +6. Place another order of that item and observe how the calculated total price reflects the change applied above. + Also observe the application's log how the `item` table is queried. + +7. Now, update the item again using the application's REST endpoint this time: + ```bash + curl -H "Content-Type: application/json" \ -X PUT \ --data @resources/data/update-item-request.json \ - http://localhost:8080/cache-invalidation/rest/items/10003 - -Or via httpie: - - cat resources/data/update-item-request.json | http PUT http://localhost:8080/cache-invalidation/rest/items/10003 - -The Debezium event handler will detect that this transaction is issued by the application itself, resulting in the item to not be removed from the cache: - - curl -H "Content-Type: application/json" \ + http://localhost:8080/rest/items/10003 + ``` + or via httpie: + ```bash + cat resources/data/update-item-request.json | http PUT http://localhost:8080/rest/items/10003 + ``` + +8. The Debezium CDC event handler detects this transaction is issued by the application, which results in the item not being removed from the cache: + You can test this use case using curl: + ```bash + curl -H "Content-Type: application/json" \ -X GET \ - http://localhost:8080/cache-invalidation/rest/cache/item/10003 - -Or via httpie: - - http GET http://localhost:8080/cache-invalidation/rest/cache/item/10003 - -If you place yet another order, you'll see how the `Item` entity is obtained from the cache, avoiding the roundtrip to the database. - -Finally, shut down database and application server: - - docker-compose down - -## Build + http://localhost:8080/rest/cache/item/10003 + ``` + or using httpie: + ```bash + http GET http://localhost:8080/rest/cache/item/10003 + ``` -Run +9. If you place another order, the `Item` entity is obtained from the cache, avoiding the database round-trip. - mvn clean package +10. Press `Ctrl+C` in the terminal to stop the Quarkus running application. + +11. Execute `mvn docker:stop`, to stop the PostgreSQL database container. -This will build the application, deploy it to WildFly via Docker and run an integration test against it. ## Development -During development, start up database and WildFly like so: - - mvn docker:build docker:start - -After code changes the application can be re-deployed like so: - - mvn wildfly:redeploy - -To get a session in Postgres run: - - docker run -it --rm --link postgres-1:postgres quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} psql -h postgres -U postgresuser --dbname inventory - -Run - - mvn docker:stop - -to shut down all the started containers. +### Start-up steps: +1. +2. First start the PostgreSQL database container using maven: + ```bash + mvn docker:start + ``` + +2. Next start the application in Quarkus development mode: + ```bash + mvn clean quarkus:dev + ``` + +### Accessing the database + +To obtain a database session in PostgreSQL, run: +```bash +docker run -it --rm --link postgres-1:postgres quay.io/debezium/example-postgres:latest psql -h postgres -U postgresuser --dbname inventory +``` + +### Shutdown steps + +1. Stop the Quarkus application by pressing `Ctrl+C` in the terminal. +2. Execute `mvn docker:stop` to stop the PostgreSQL database container. diff --git a/cache-invalidation/docker-compose.yaml b/cache-invalidation/docker-compose.yaml deleted file mode 100644 index 0e9e257a..00000000 --- a/cache-invalidation/docker-compose.yaml +++ /dev/null @@ -1,20 +0,0 @@ -version: '2' -services: - postgres: - image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} - ports: - - "5432:5432" - environment: - - POSTGRES_USER=postgresuser - - POSTGRES_PASSWORD=postgrespw - - POSTGRES_DB=inventory - order-manager: - image: debezium-examples/cache-invalidation:latest - build: - context: . - links: - - postgres - ports: - - 8080:8080 - - 8787:8787 - - 9990:9990 diff --git a/cache-invalidation/pom.xml b/cache-invalidation/pom.xml index a2c4e3c8..e9cb54d1 100644 --- a/cache-invalidation/pom.xml +++ b/cache-invalidation/pom.xml @@ -6,31 +6,57 @@ io.debezium.examples cache-invalidation 1.0-SNAPSHOT - war + jar cache-invalidation - false + 3.13.0 + 21 UTF-8 - 11 - 11 - 2.1.3.Final - 2.1 - 5.4.24.Final - 11.0.6.Final - 3.3.2 + UTF-8 + quarkus-bom + io.quarkus.platform + 3.15.1 + true + 3.3.1 + 15.0.8.Final + 2.7.3.Final + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + - javax - javaee-api - 7.0 - provided + io.quarkus + quarkus-rest + + + io.quarkus + quarkus-hibernate-orm + + + io.quarkus + quarkus-rest-jackson + + + io.quarkus + quarkus-jdbc-postgresql + + + io.quarkus + quarkus-arc io.debezium @@ -43,62 +69,70 @@ ${version.debezium} - org.hibernate - hibernate-core - ${version.hibernate} - provided + org.testcontainers + testcontainers org.infinispan infinispan-core - ${version.infinispan} + 15.0.8.Final provided - junit - junit - 4.13.1 + io.quarkus + quarkus-junit5 test io.rest-assured rest-assured - 3.2.0 test - - org.postgresql - postgresql - 42.7.2 - org.awaitility awaitility - 3.1.3 test - cache-invalidation - - - src/test/resources - true - - - org.apache.maven.plugins - maven-war-plugin - ${mvn.war.version} + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + native-image-agent + + + + + + maven-compiler-plugin + ${compiler-plugin.version} + + true + + + + maven-surefire-plugin + ${surefire-plugin.version} - false + + org.jboss.logmanager.LogManager + ${maven.home} + - org.apache.maven.plugins maven-failsafe-plugin + ${surefire-plugin.version} @@ -107,96 +141,62 @@ + + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + io.fabric8 docker-maven-plugin + 0.43.4 + 500 + default + true - db - quay.io/debezium/example-postgres:${version.debezium.tag} + + debezium/postgres-server-test-database postgresuser postgrespw inventory + -E UTF8 + en_US.utf8 - - database system is ready to accept connections - 5432:5432 - postgres - - - - debezium-examples/cache-invalidation - order-manager - - ${project.basedir} - - - - db - - - 8080:8080 - 8787:8787 - 9990:9990 - + + postgres + true + green + - WildFly Full .* \(WildFly Core .*\) started + (?s)PostgreSQL init process complete.*database system is ready to accept connections + + quay.io/debezium/example-postgres:latest + + ln -fs /usr/share/zoneinfo/US/Samoa /etc/localtime && echo timezone=US/Samoa >> /usr/share/postgresql/postgresql.conf.sample + + + + properties + override + - - - start - pre-integration-test - - build - start - - - - stop - post-integration-test - - stop - - - - - - - - io.fabric8 - docker-maven-plugin - 0.27.2 - - - org.apache.maven.plugins - maven-failsafe-plugin - 3.0.0-M1 - - - org.wildfly.plugins - wildfly-maven-plugin - 2.0.0.Final - - localhost - admin - secret - - - - + diff --git a/cache-invalidation/resources/wildfly/customization/commands.cli b/cache-invalidation/resources/wildfly/customization/commands.cli deleted file mode 100644 index 3310a489..00000000 --- a/cache-invalidation/resources/wildfly/customization/commands.cli +++ /dev/null @@ -1,14 +0,0 @@ -# Mark the commands below to be run as a batch -batch - -# Add module -module add --name=org.postgres --resources=/tmp/pg-driver/postgresql-42.3.3.jar --dependencies=javax.api,javax.transaction.api - -# Add PostgreSQL driver -/subsystem=datasources/jdbc-driver=postgres:add(driver-name=postgres,driver-module-name=org.postgres,driver-class-name=org.postgresql.Driver) - -# Add the datasource -data-source add --name=OrderDS --driver-name=postgres --jndi-name=java:jboss/datasources/OrderDS --connection-url=jdbc:postgresql://postgres:5432/inventory --user-name=postgresuser --password=postgrespw --valid-connection-checker-class-name=org.jboss.jca.adapters.jdbc.extensions.postgres.PostgreSQLValidConnectionChecker --exception-sorter-class-name=org.jboss.jca.adapters.jdbc.extensions.postgres.PostgreSQLExceptionSorter - -# Execute the batch -run-batch diff --git a/cache-invalidation/resources/wildfly/customization/execute.sh b/cache-invalidation/resources/wildfly/customization/execute.sh deleted file mode 100755 index b559cf40..00000000 --- a/cache-invalidation/resources/wildfly/customization/execute.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -# Usage: execute.sh [WildFly mode] [configuration file] -# -# The default mode is 'standalone' and default configuration is based on the -# mode. It can be 'standalone.xml' or 'domain.xml'. - -JBOSS_HOME=/opt/jboss/wildfly -JBOSS_CLI=$JBOSS_HOME/bin/jboss-cli.sh -JBOSS_MODE=${1:-"standalone"} -JBOSS_CONFIG=${2:-"$JBOSS_MODE.xml"} - -function wait_for_server() { - until `$JBOSS_CLI -c "ls /deployment" &> /dev/null`; do - sleep 1 - done -} - -echo "=> Starting WildFly server" -$JBOSS_HOME/bin/$JBOSS_MODE.sh -c $JBOSS_CONFIG > /dev/null & - -echo "=> Waiting for the server to boot" -wait_for_server - -echo "=> Executing the commands" -$JBOSS_CLI -c --file=`dirname "$0"`/commands.cli - -echo "=> Shutting down WildFly" -if [ "$JBOSS_MODE" = "standalone" ]; then - $JBOSS_CLI -c ":shutdown" -else - $JBOSS_CLI -c "/host=*:shutdown" -fi diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java index 318ec1be..21630498 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java @@ -7,9 +7,9 @@ import java.math.BigDecimal; -import javax.persistence.Cacheable; -import javax.persistence.Entity; -import javax.persistence.Id; +import jakarta.persistence.Cacheable; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; @Entity @Cacheable diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java index 4d6f169d..4bcea079 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java @@ -7,11 +7,11 @@ import java.math.BigDecimal; -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.Id; -import javax.persistence.ManyToOne; -import javax.persistence.SequenceGenerator; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; +import jakarta.persistence.ManyToOne; +import jakarta.persistence.SequenceGenerator; @Entity public class PurchaseOrder { diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java index 4998d1b0..f4efee48 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java @@ -5,32 +5,35 @@ */ package io.debezium.examples.cacheinvalidation.persistence; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; -import javax.annotation.PreDestroy; -import javax.annotation.Resource; -import javax.enterprise.concurrent.ManagedExecutorService; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.context.Initialized; -import javax.enterprise.event.Observes; -import javax.inject.Inject; -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.persistence.PersistenceContext; -import javax.persistence.PersistenceUnit; - +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.persistence.EntityManager; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.PersistenceContext; + +import jakarta.persistence.PersistenceUnit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.ConfigValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; -import io.debezium.connector.postgresql.PostgresConnector; -import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.data.Envelope.Operation; -import io.debezium.embedded.EmbeddedEngine; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.RecordChangeEvent; +import io.debezium.engine.format.ChangeEventFormat; import io.debezium.examples.cacheinvalidation.model.Item; /** @@ -44,10 +47,8 @@ @ApplicationScoped public class DatabaseChangeEventListener { - private static final Logger LOG = LoggerFactory.getLogger( DatabaseChangeEventListener.class ); - - @Resource - private ManagedExecutorService executorService; + private static final String CONFIG_PREFIX = "quarkus.debezium-cdc."; + private static final Logger LOG = LoggerFactory.getLogger(DatabaseChangeEventListener.class); @PersistenceUnit private EntityManagerFactory emf; @@ -55,43 +56,52 @@ public class DatabaseChangeEventListener { @PersistenceContext private EntityManager em; - private EmbeddedEngine engine; - @Inject private KnownTransactions knownTransactions; + private DebeziumEngine engine; + private ExecutorService executorService; + + @Priority(2) public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) { LOG.info("Launching Debezium embedded engine"); - Configuration config = Configuration.empty() - .withSystemProperties(Function.identity()).edit() - .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class) - .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine") - .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class) - .with("name", "cache-invalidation-connector") - .with("database.hostname", "postgres") - .with("database.port", 5432) - .with("database.user", "postgresuser") - .with("database.password", "postgrespw") - .with("topic.prefix", "dbserver1") - .with("database.dbname", "inventory") - .with("table.include.list", "public.item") - .with("plugin.name", "pgoutput") - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + final Properties properties = new Properties(); + for (String propertyName : ConfigProvider.getConfig().getPropertyNames()) { + if (propertyName.startsWith(CONFIG_PREFIX)) { + final String key = propertyName.replace(CONFIG_PREFIX, ""); + final ConfigValue value = ConfigProvider.getConfig().getConfigValue(propertyName); + properties.put(key, value.getRawValue()); + LOG.info("\t{}: {}", key, value.getRawValue()); + } + } + + final Configuration config = Configuration.empty() + .withSystemProperties(Function.identity()) + .edit() + .with(Configuration.from(properties)) .build(); - this.engine = EmbeddedEngine.create() - .using(config) - .notifying(this::handleDbChangeEvent) + this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) + .using(config.asProperties()) + .notifying((list, recordCommitter) -> { + for (RecordChangeEvent record : list) { + handleDbChangeEvent(record.record()); + recordCommitter.markProcessed(record); + } + recordCommitter.markBatchFinished(); + }) .build(); + executorService = Executors.newFixedThreadPool(1); executorService.execute(engine); } @PreDestroy - public void shutdownEngine() { + public void shutdownEngine() throws Exception { LOG.info("Stopping Debezium embedded engine"); - engine.stop(); + engine.close(); + executorService.shutdown(); } private void handleDbChangeEvent(SourceRecord record) { diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java index dcc06693..bf77f805 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java @@ -2,12 +2,13 @@ import java.util.concurrent.TimeUnit; -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; import org.infinispan.Cache; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.manager.DefaultCacheManager; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,11 +29,11 @@ public KnownTransactions() { cacheManager.defineConfiguration( "tx-id-cache", new ConfigurationBuilder() - .simpleCache(true) - .expiration() + .simpleCache(true) + .expiration() .lifespan(60, TimeUnit.SECONDS) - .build() - ); + .build() + ); applicationTransactions = cacheManager.getCache("tx-id-cache"); } diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java index 92518c82..0c0a7706 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java @@ -26,11 +26,11 @@ public class TransactionRegistrationIntegrator implements Integrator { @Override public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory, - SessionFactoryServiceRegistry serviceRegistry) { + SessionFactoryServiceRegistry serviceRegistry) { LOG.info("TransactionRegistrationIntegrator#integrate()"); serviceRegistry.getService(EventListenerRegistry.class) - .appendListeners(EventType.FLUSH, new TransactionRegistrationListener()); + .appendListeners(EventType.FLUSH, new TransactionRegistrationListener()); } @Override diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java index 6e170c38..71f3346a 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java @@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.enterprise.inject.spi.CDI; +import jakarta.enterprise.inject.spi.CDI; import org.hibernate.FlushMode; import org.hibernate.HibernateException; @@ -23,8 +23,6 @@ */ class TransactionRegistrationListener implements FlushEventListener { - private static final long serialVersionUID = 1L; - private final ConcurrentMap sessionsWithBeforeTransactionCompletion; private volatile KnownTransactions knownTransactions; @@ -43,7 +41,7 @@ public void onFlush(FlushEvent event) throws HibernateException { event.getSession().getActionQueue().registerProcess( session -> { Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()") - .setFlushMode(FlushMode.MANUAL) + .setHibernateFlushMode(FlushMode.MANUAL) .getSingleResult(); getKnownTransactions().register(txId.longValue()); @@ -52,7 +50,7 @@ public void onFlush(FlushEvent event) throws HibernateException { } ); } - private KnownTransactions getKnownTransactions() { + private KnownTransactions getKnownTransactions() { KnownTransactions value = knownTransactions; if (value == null) { diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java index 7ebb3388..6aef7ebb 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java @@ -5,16 +5,16 @@ */ package io.debezium.examples.cacheinvalidation.rest; -import javax.enterprise.context.ApplicationScoped; -import javax.persistence.EntityManagerFactory; -import javax.persistence.PersistenceUnit; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.PersistenceUnit; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; import io.debezium.examples.cacheinvalidation.model.Item; diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java index e1c3f21e..03e1e822 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java @@ -5,17 +5,17 @@ */ package io.debezium.examples.cacheinvalidation.rest; -import javax.enterprise.context.ApplicationScoped; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; -import javax.transaction.Transactional; -import javax.ws.rs.Consumes; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import jakarta.transaction.Transactional; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; import io.debezium.examples.cacheinvalidation.model.Item; diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java index 3b77f1d2..70f402ef 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java @@ -7,15 +7,15 @@ import java.math.BigDecimal; -import javax.enterprise.context.ApplicationScoped; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; -import javax.transaction.Transactional; -import javax.ws.rs.Consumes; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; +import jakarta.transaction.Transactional; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; import io.debezium.examples.cacheinvalidation.model.Item; import io.debezium.examples.cacheinvalidation.model.PurchaseOrder; diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java index c07cdd4d..4149d891 100644 --- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java +++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java @@ -8,8 +8,8 @@ import java.util.HashSet; import java.util.Set; -import javax.ws.rs.ApplicationPath; -import javax.ws.rs.core.Application; +import jakarta.ws.rs.ApplicationPath; +import jakarta.ws.rs.core.Application; @ApplicationPath("/rest") public class RestApplication extends Application { diff --git a/cache-invalidation/src/main/resources/META-INF/data.sql b/cache-invalidation/src/main/resources/META-INF/data.sql index 34187558..aee1c66a 100644 --- a/cache-invalidation/src/main/resources/META-INF/data.sql +++ b/cache-invalidation/src/main/resources/META-INF/data.sql @@ -1,3 +1,3 @@ -INSERT INTO Item VALUES (10001, 'The Birds', 9.99); -INSERT INTO Item VALUES (10002, 'To Catch A Thieve', 12.99); -INSERT INTO Item VALUES (10003, 'North By Northwest', 14.99); +INSERT INTO Item (id,description,price) VALUES (10001, 'The Birds', 9.99); +INSERT INTO Item (id,description,price) VALUES (10002, 'To Catch A Thieve', 12.99); +INSERT INTO Item (id,description,price) VALUES (10003, 'North By Northwest', 14.99); diff --git a/cache-invalidation/src/main/resources/META-INF/persistence.xml b/cache-invalidation/src/main/resources/META-INF/persistence.xml deleted file mode 100644 index 48bf239d..00000000 --- a/cache-invalidation/src/main/resources/META-INF/persistence.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - java:jboss/datasources/OrderDS - ENABLE_SELECTIVE - - - - - - - - - - - - - diff --git a/cache-invalidation/src/main/resources/application.properties b/cache-invalidation/src/main/resources/application.properties new file mode 100644 index 00000000..98c820e8 --- /dev/null +++ b/cache-invalidation/src/main/resources/application.properties @@ -0,0 +1,25 @@ +# Datasource +quarkus.datasource.db-kind=pg +quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/inventory +quarkus.datasource.username=postgresuser +quarkus.datasource.password=postgrespw + +# Hibernate configuration +quarkus.hibernate-orm.log.sql=true +quarkus.hibernate-orm.log.format-sql=true +quarkus.hibernate-orm.sql-load-script=META-INF/data.sql +quarkus.hibernate-orm.database.generation=drop-and-create + +# Debezium CDC configuration +quarkus.debezium-cdc.connector.class=io.debezium.connector.postgresql.PostgresConnector +quarkus.debezium-cdc.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore +quarkus.debezium-cdc.name=cache-invalidation-connector +quarkus.debezium-cdc.database.hostname=localhost +quarkus.debezium-cdc.database.port=5432 +quarkus.debezium-cdc.database.user=postgresuser +quarkus.debezium-cdc.database.password=postgrespw +quarkus.debezium-cdc.database.dbname=inventory +quarkus.debezium-cdc.topic.prefix=dbserver1 +quarkus.debezium-cdc.table.include.list=public.item +quarkus.debezium-cdc.plugin.name=pgoutput +quarkus.debezium-cdc.snapshot.mode=never diff --git a/cache-invalidation/src/main/webapp/WEB-INF/beans.xml b/cache-invalidation/src/main/webapp/WEB-INF/beans.xml deleted file mode 100644 index 7a29cf42..00000000 --- a/cache-invalidation/src/main/webapp/WEB-INF/beans.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - diff --git a/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml b/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml deleted file mode 100644 index 17da62d0..00000000 --- a/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - diff --git a/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java b/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java index 8807a6bc..261c9062 100644 --- a/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java +++ b/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java @@ -18,14 +18,27 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; +import io.quarkus.test.junit.QuarkusTest; +import org.eclipse.microprofile.config.inject.ConfigProperty; import io.restassured.http.ContentType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@QuarkusTest public class CacheInvalidationIT { - @Before + @ConfigProperty(name = "quarkus.datasource.jdbc.url") + String jdbcUrl; + + @ConfigProperty(name = "quarkus.datasource.username") + String jdbcUserName; + + @ConfigProperty(name = "quarkus.datasource.password") + String jdbcPassword; + + @BeforeEach public void prepareItem() { updateItem(10003, "North by Northwest", 14.99F); } @@ -42,7 +55,7 @@ public void shouldInvalidateCacheAfterDatabaseUpdate() throws Exception { // cache should be invalidated await().atMost(5, TimeUnit.SECONDS) .until(() -> { - return !get("/cache-invalidation/rest/cache/item/10003").as(boolean.class); + return !get("/rest/cache/item/10003").as(boolean.class); }); // and the item reloaded from the DB @@ -61,7 +74,7 @@ public void shouldNotInvalidateCacheAfterUpdateThroughApplication() throws Excep Thread.sleep(3000); // cache should not be invalidated - assertTrue(get("/cache-invalidation/rest/cache/item/10003").as(boolean.class)); + assertTrue(get("/rest/cache/item/10003").as(boolean.class)); } private void placeOrder(long itemId, int quantity, float expectedTotalPrice) { @@ -75,7 +88,7 @@ private void placeOrder(long itemId, int quantity, float expectedTotalPrice) { "}" ) .when() - .post("/cache-invalidation/rest/orders") + .post("/rest/orders") .then() .body("totalPrice", equalTo(expectedTotalPrice)); } @@ -90,17 +103,16 @@ private void updateItem(long itemId, String newDescription, float newPrice) { "}" ) .when() - .put("/cache-invalidation/rest/items/{id}", itemId) + .put("/rest/items/{id}", itemId) .then() .statusCode(200); } private Connection getDbConnection() throws SQLException { - String url = "jdbc:postgresql://localhost/inventory"; - Properties props = new Properties(); - props.setProperty("user","postgresuser"); - props.setProperty("password","postgrespw"); + final Properties props = new Properties(); + props.setProperty("user", jdbcUserName); + props.setProperty("password", jdbcPassword); - return DriverManager.getConnection(url, props); + return DriverManager.getConnection(jdbcUrl, props); } } diff --git a/cache-invalidation/src/test/resources/log4j.properties b/cache-invalidation/src/test/resources/log4j.properties deleted file mode 100644 index 6c5374ed..00000000 --- a/cache-invalidation/src/test/resources/log4j.properties +++ /dev/null @@ -1,11 +0,0 @@ -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n - -# Root logger option -log4j.rootLogger=DEBUG, stdout - -# Set up the default logging to be DEBUG level, then override specific units -# log4j.logger.io.debezium=INFO