diff --git a/.github/workflows/cache-invalidation-workflow.yml b/.github/workflows/cache-invalidation-workflow.yml
index ed19fc94..796b2493 100644
--- a/.github/workflows/cache-invalidation-workflow.yml
+++ b/.github/workflows/cache-invalidation-workflow.yml
@@ -14,6 +14,11 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
+ - name: Set up Java
+ uses: actions/setup-java@v4
+ with:
+ distribution: "temurin"
+ java-version: 21
- uses: actions/checkout@v3
- name: Cache local Maven repository
uses: actions/cache@v2
@@ -23,4 +28,4 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Check changes in [cache-invalidation] example
- run: cd cache-invalidation && mvn clean package -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+ run: cd cache-invalidation && mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
diff --git a/cache-invalidation/Dockerfile b/cache-invalidation/Dockerfile
deleted file mode 100644
index a7ae12f8..00000000
--- a/cache-invalidation/Dockerfile
+++ /dev/null
@@ -1,26 +0,0 @@
-FROM jboss/wildfly:25.0.0.Final
-
-ADD resources/wildfly/customization /opt/jboss/wildfly/customization/
-RUN mkdir /tmp/pg-driver && cd /tmp/pg-driver && curl -sO https://jdbc.postgresql.org/download/postgresql-42.5.1.jar
-
-RUN mkdir /tmp/infinispan-modules && \
- cd /tmp/infinispan-modules && \
- curl -sO https://downloads.jboss.org/infinispan/11.0.6.Final/infinispan-wildfly-modules-11.0.6.Final.zip && \
- unzip infinispan-wildfly-modules-11.0.6.Final.zip && \
- cp -r infinispan-wildfly-modules-11.0.6.Final/modules/* /opt/jboss/wildfly/modules && \
- rm infinispan-wildfly-modules-11.0.6.Final.zip && \
- rm -rf infinispan-wildfly-modules-11.0.6.Final
-
-# Based on:
-# https://goldmann.pl/blog/2014/07/23/customizing-the-configuration-of-the-wildfly-docker-image/
-# https://tomylab.wordpress.com/2016/07/24/how-to-add-a-datasource-to-wildfly/
-RUN /opt/jboss/wildfly/customization/execute.sh
-
-RUN /opt/jboss/wildfly/bin/add-user.sh admin secret
-
-ADD target/cache-invalidation.war /opt/jboss/wildfly/standalone/deployments/
-
-# Fix for Error: Could not rename /opt/jboss/wildfly/standalone/configuration/standalone_xml_history/current
-RUN rm -rf /opt/jboss/wildfly/standalone/configuration/standalone_xml_history
-
-CMD ["/opt/jboss/wildfly/bin/standalone.sh", "-b", "0.0.0.0", "-bmanagement", "0.0.0.0", "--debug"]
diff --git a/cache-invalidation/README.md b/cache-invalidation/README.md
index d407fc2c..6dee8dcb 100644
--- a/cache-invalidation/README.md
+++ b/cache-invalidation/README.md
@@ -3,98 +3,109 @@
This demo shows how Debezium can be used to invalidate items in the JPA 2nd level cache after external data changes,
e.g. a manual record update in the database, bypassing the application layer.
-The application runs on WildFly and uses Postgres as a database.
+The application uses Quarkus, Hibernate, and PostgreSQL as a database.
The domain model is centered around purchase orders of given items.
The `Item` entity is marked as cacheable, i.e. after updates to an item (e.g. its base price),
it must be purged from the 2nd-level cache in order to correctly calculate the price of future orders of that item.
## Manual Testing
-To run the app, follow these steps:
-
- export DEBEZIUM_VERSION=2.1
- mvn clean package
- docker-compose up --build
-
-Place an order for item 10003 using curl:
-
- curl -H "Content-Type: application/json" \
+1. First start the database container.
+ ```bash
+ mvn docker:start
+ ```
+ This will start the `quay.io/debezium/example-postgres:latest` container that will be used to store our JPA entities and the database that the Debezium will capture changes from, too.
+
+2. Run the application using the Quarkus development mode.
+ ```bash
+ mvn clean quarkus:dev
+ ```
+ The application will start in the development mode and run until you press `Ctrl+C` to stop the application.
+
+3. Place an order for item 1003 using curl:
+ ```bash
+ curl -H "Content-Type: application/json" \
-X POST \
--data @resources/data/create-order-request.json \
- http://localhost:8080/cache-invalidation/rest/orders
-
-Or, if [httpie](https://httpie.org/) is your preferred CLI HTTP client:
-
- cat resources/data/create-order-request.json | http POST http://localhost:8080/cache-invalidation/rest/orders
-
-Update the price of item 10003 directly in the database:
-
- docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'
-
-Use the application's REST API to verify that the item has been purged from the cache:
-
- curl -H "Content-Type: application/json" \
+ http://localhost:8080/rest/orders
+ ```
+ Or, if [httpie](https://httpie.org/) is your preferred CLI HTTP client:
+ ```bash
+ cat resources/data/create-order-request.json | http POST http://localhost:8080/rest/orders
+ ```
+
+4. Update the price of item 10003 directly in the database:
+ ```bash
+ docker exec postgres-server-test-database-1 bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"'
+ ```
+
+5. Now use the REST endpoint to verify that the item has been purged from the cache:
+ ```bash
+ curl -H "Content-Type: application/json" \
-X GET \
- http://localhost:8080/cache-invalidation/rest/cache/item/10003
-
-Or via httpie:
-
- http GET http://localhost:8080/cache-invalidation/rest/cache/item/10003
-
-Place another order of that item and observe how the calculated total price reflects the change applied above.
-Also observe in the application's log how the `item` table is queried.
-
-Now update the item again, using the application's REST API this time:
-
- curl -H "Content-Type: application/json" \
+ http://localhost:8080/rest/cache/item/10003
+ ```
+ or via httpie:
+ ```bash
+ http GET http://localhost:8080/rest/cache/item/10003
+ ```
+
+6. Place another order of that item and observe how the calculated total price reflects the change applied above.
+ Also observe the application's log how the `item` table is queried.
+
+7. Now, update the item again using the application's REST endpoint this time:
+ ```bash
+ curl -H "Content-Type: application/json" \
-X PUT \
--data @resources/data/update-item-request.json \
- http://localhost:8080/cache-invalidation/rest/items/10003
-
-Or via httpie:
-
- cat resources/data/update-item-request.json | http PUT http://localhost:8080/cache-invalidation/rest/items/10003
-
-The Debezium event handler will detect that this transaction is issued by the application itself, resulting in the item to not be removed from the cache:
-
- curl -H "Content-Type: application/json" \
+ http://localhost:8080/rest/items/10003
+ ```
+ or via httpie:
+ ```bash
+ cat resources/data/update-item-request.json | http PUT http://localhost:8080/rest/items/10003
+ ```
+
+8. The Debezium CDC event handler detects this transaction is issued by the application, which results in the item not being removed from the cache:
+ You can test this use case using curl:
+ ```bash
+ curl -H "Content-Type: application/json" \
-X GET \
- http://localhost:8080/cache-invalidation/rest/cache/item/10003
-
-Or via httpie:
-
- http GET http://localhost:8080/cache-invalidation/rest/cache/item/10003
-
-If you place yet another order, you'll see how the `Item` entity is obtained from the cache, avoiding the roundtrip to the database.
-
-Finally, shut down database and application server:
-
- docker-compose down
-
-## Build
+ http://localhost:8080/rest/cache/item/10003
+ ```
+ or using httpie:
+ ```bash
+ http GET http://localhost:8080/rest/cache/item/10003
+ ```
-Run
+9. If you place another order, the `Item` entity is obtained from the cache, avoiding the database round-trip.
- mvn clean package
+10. Press `Ctrl+C` in the terminal to stop the Quarkus running application.
+
+11. Execute `mvn docker:stop`, to stop the PostgreSQL database container.
-This will build the application, deploy it to WildFly via Docker and run an integration test against it.
## Development
-During development, start up database and WildFly like so:
-
- mvn docker:build docker:start
-
-After code changes the application can be re-deployed like so:
-
- mvn wildfly:redeploy
-
-To get a session in Postgres run:
-
- docker run -it --rm --link postgres-1:postgres quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} psql -h postgres -U postgresuser --dbname inventory
-
-Run
-
- mvn docker:stop
-
-to shut down all the started containers.
+### Start-up steps:
+1.
+2. First start the PostgreSQL database container using maven:
+ ```bash
+ mvn docker:start
+ ```
+
+2. Next start the application in Quarkus development mode:
+ ```bash
+ mvn clean quarkus:dev
+ ```
+
+### Accessing the database
+
+To obtain a database session in PostgreSQL, run:
+```bash
+docker run -it --rm --link postgres-1:postgres quay.io/debezium/example-postgres:latest psql -h postgres -U postgresuser --dbname inventory
+```
+
+### Shutdown steps
+
+1. Stop the Quarkus application by pressing `Ctrl+C` in the terminal.
+2. Execute `mvn docker:stop` to stop the PostgreSQL database container.
diff --git a/cache-invalidation/docker-compose.yaml b/cache-invalidation/docker-compose.yaml
deleted file mode 100644
index 0e9e257a..00000000
--- a/cache-invalidation/docker-compose.yaml
+++ /dev/null
@@ -1,20 +0,0 @@
-version: '2'
-services:
- postgres:
- image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
- ports:
- - "5432:5432"
- environment:
- - POSTGRES_USER=postgresuser
- - POSTGRES_PASSWORD=postgrespw
- - POSTGRES_DB=inventory
- order-manager:
- image: debezium-examples/cache-invalidation:latest
- build:
- context: .
- links:
- - postgres
- ports:
- - 8080:8080
- - 8787:8787
- - 9990:9990
diff --git a/cache-invalidation/pom.xml b/cache-invalidation/pom.xml
index a2c4e3c8..e9cb54d1 100644
--- a/cache-invalidation/pom.xml
+++ b/cache-invalidation/pom.xml
@@ -6,31 +6,57 @@
io.debezium.examples
cache-invalidation
1.0-SNAPSHOT
- war
+ jar
cache-invalidation
- false
+ 3.13.0
+ 21
UTF-8
- 11
- 11
- 2.1.3.Final
- 2.1
- 5.4.24.Final
- 11.0.6.Final
- 3.3.2
+ UTF-8
+ quarkus-bom
+ io.quarkus.platform
+ 3.15.1
+ true
+ 3.3.1
+ 15.0.8.Final
+ 2.7.3.Final
+
+
+ ${quarkus.platform.group-id}
+ ${quarkus.platform.artifact-id}
+ ${quarkus.platform.version}
+ pom
+ import
+
+
+
- javax
- javaee-api
- 7.0
- provided
+ io.quarkus
+ quarkus-rest
+
+
+ io.quarkus
+ quarkus-hibernate-orm
+
+
+ io.quarkus
+ quarkus-rest-jackson
+
+
+ io.quarkus
+ quarkus-jdbc-postgresql
+
+
+ io.quarkus
+ quarkus-arc
io.debezium
@@ -43,62 +69,70 @@
${version.debezium}
- org.hibernate
- hibernate-core
- ${version.hibernate}
- provided
+ org.testcontainers
+ testcontainers
org.infinispan
infinispan-core
- ${version.infinispan}
+ 15.0.8.Final
provided
- junit
- junit
- 4.13.1
+ io.quarkus
+ quarkus-junit5
test
io.rest-assured
rest-assured
- 3.2.0
test
-
- org.postgresql
- postgresql
- 42.7.2
-
org.awaitility
awaitility
- 3.1.3
test
- cache-invalidation
-
-
- src/test/resources
- true
-
-
- org.apache.maven.plugins
- maven-war-plugin
- ${mvn.war.version}
+ ${quarkus.platform.group-id}
+ quarkus-maven-plugin
+ ${quarkus.platform.version}
+ true
+
+
+
+ build
+ generate-code
+ generate-code-tests
+ native-image-agent
+
+
+
+
+
+ maven-compiler-plugin
+ ${compiler-plugin.version}
+
+ true
+
+
+
+ maven-surefire-plugin
+ ${surefire-plugin.version}
- false
+
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
- org.apache.maven.plugins
maven-failsafe-plugin
+ ${surefire-plugin.version}
@@ -107,96 +141,62 @@
+
+
+ ${project.build.directory}/${project.build.finalName}-runner
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
io.fabric8
docker-maven-plugin
+ 0.43.4
+ 500
+ default
+ true
- db
- quay.io/debezium/example-postgres:${version.debezium.tag}
+
+ debezium/postgres-server-test-database
postgresuser
postgrespw
inventory
+ -E UTF8
+ en_US.utf8
-
- database system is ready to accept connections
-
5432:5432
- postgres
-
-
-
- debezium-examples/cache-invalidation
- order-manager
-
- ${project.basedir}
-
-
-
- db
-
-
- 8080:8080
- 8787:8787
- 9990:9990
-
+
+ postgres
+ true
+ green
+
- WildFly Full .* \(WildFly Core .*\) started
+ (?s)PostgreSQL init process complete.*database system is ready to accept connections
+
+ quay.io/debezium/example-postgres:latest
+
+ ln -fs /usr/share/zoneinfo/US/Samoa /etc/localtime && echo timezone=US/Samoa >> /usr/share/postgresql/postgresql.conf.sample
+
+
+
+ properties
+ override
+
-
-
- start
- pre-integration-test
-
- build
- start
-
-
-
- stop
- post-integration-test
-
- stop
-
-
-
-
-
-
-
- io.fabric8
- docker-maven-plugin
- 0.27.2
-
-
- org.apache.maven.plugins
- maven-failsafe-plugin
- 3.0.0-M1
-
-
- org.wildfly.plugins
- wildfly-maven-plugin
- 2.0.0.Final
-
- localhost
- admin
- secret
-
-
-
-
+
diff --git a/cache-invalidation/resources/wildfly/customization/commands.cli b/cache-invalidation/resources/wildfly/customization/commands.cli
deleted file mode 100644
index 3310a489..00000000
--- a/cache-invalidation/resources/wildfly/customization/commands.cli
+++ /dev/null
@@ -1,14 +0,0 @@
-# Mark the commands below to be run as a batch
-batch
-
-# Add module
-module add --name=org.postgres --resources=/tmp/pg-driver/postgresql-42.3.3.jar --dependencies=javax.api,javax.transaction.api
-
-# Add PostgreSQL driver
-/subsystem=datasources/jdbc-driver=postgres:add(driver-name=postgres,driver-module-name=org.postgres,driver-class-name=org.postgresql.Driver)
-
-# Add the datasource
-data-source add --name=OrderDS --driver-name=postgres --jndi-name=java:jboss/datasources/OrderDS --connection-url=jdbc:postgresql://postgres:5432/inventory --user-name=postgresuser --password=postgrespw --valid-connection-checker-class-name=org.jboss.jca.adapters.jdbc.extensions.postgres.PostgreSQLValidConnectionChecker --exception-sorter-class-name=org.jboss.jca.adapters.jdbc.extensions.postgres.PostgreSQLExceptionSorter
-
-# Execute the batch
-run-batch
diff --git a/cache-invalidation/resources/wildfly/customization/execute.sh b/cache-invalidation/resources/wildfly/customization/execute.sh
deleted file mode 100755
index b559cf40..00000000
--- a/cache-invalidation/resources/wildfly/customization/execute.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-# Usage: execute.sh [WildFly mode] [configuration file]
-#
-# The default mode is 'standalone' and default configuration is based on the
-# mode. It can be 'standalone.xml' or 'domain.xml'.
-
-JBOSS_HOME=/opt/jboss/wildfly
-JBOSS_CLI=$JBOSS_HOME/bin/jboss-cli.sh
-JBOSS_MODE=${1:-"standalone"}
-JBOSS_CONFIG=${2:-"$JBOSS_MODE.xml"}
-
-function wait_for_server() {
- until `$JBOSS_CLI -c "ls /deployment" &> /dev/null`; do
- sleep 1
- done
-}
-
-echo "=> Starting WildFly server"
-$JBOSS_HOME/bin/$JBOSS_MODE.sh -c $JBOSS_CONFIG > /dev/null &
-
-echo "=> Waiting for the server to boot"
-wait_for_server
-
-echo "=> Executing the commands"
-$JBOSS_CLI -c --file=`dirname "$0"`/commands.cli
-
-echo "=> Shutting down WildFly"
-if [ "$JBOSS_MODE" = "standalone" ]; then
- $JBOSS_CLI -c ":shutdown"
-else
- $JBOSS_CLI -c "/host=*:shutdown"
-fi
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java
index 318ec1be..21630498 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/Item.java
@@ -7,9 +7,9 @@
import java.math.BigDecimal;
-import javax.persistence.Cacheable;
-import javax.persistence.Entity;
-import javax.persistence.Id;
+import jakarta.persistence.Cacheable;
+import jakarta.persistence.Entity;
+import jakarta.persistence.Id;
@Entity
@Cacheable
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java
index 4d6f169d..4bcea079 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/model/PurchaseOrder.java
@@ -7,11 +7,11 @@
import java.math.BigDecimal;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.Id;
-import javax.persistence.ManyToOne;
-import javax.persistence.SequenceGenerator;
+import jakarta.persistence.Entity;
+import jakarta.persistence.GeneratedValue;
+import jakarta.persistence.Id;
+import jakarta.persistence.ManyToOne;
+import jakarta.persistence.SequenceGenerator;
@Entity
public class PurchaseOrder {
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java
index 4998d1b0..f4efee48 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/DatabaseChangeEventListener.java
@@ -5,32 +5,35 @@
*/
package io.debezium.examples.cacheinvalidation.persistence;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.Function;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import javax.enterprise.concurrent.ManagedExecutorService;
-import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.context.Initialized;
-import javax.enterprise.event.Observes;
-import javax.inject.Inject;
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.PersistenceContext;
-import javax.persistence.PersistenceUnit;
-
+import jakarta.annotation.PreDestroy;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.Initialized;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import jakarta.persistence.EntityManager;
+import jakarta.persistence.EntityManagerFactory;
+import jakarta.persistence.PersistenceContext;
+
+import jakarta.persistence.PersistenceUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.eclipse.microprofile.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
-import io.debezium.connector.postgresql.PostgresConnector;
-import io.debezium.connector.postgresql.PostgresConnectorConfig;
-import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope.Operation;
-import io.debezium.embedded.EmbeddedEngine;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.RecordChangeEvent;
+import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.examples.cacheinvalidation.model.Item;
/**
@@ -44,10 +47,8 @@
@ApplicationScoped
public class DatabaseChangeEventListener {
- private static final Logger LOG = LoggerFactory.getLogger( DatabaseChangeEventListener.class );
-
- @Resource
- private ManagedExecutorService executorService;
+ private static final String CONFIG_PREFIX = "quarkus.debezium-cdc.";
+ private static final Logger LOG = LoggerFactory.getLogger(DatabaseChangeEventListener.class);
@PersistenceUnit
private EntityManagerFactory emf;
@@ -55,43 +56,52 @@ public class DatabaseChangeEventListener {
@PersistenceContext
private EntityManager em;
- private EmbeddedEngine engine;
-
@Inject
private KnownTransactions knownTransactions;
+ private DebeziumEngine> engine;
+ private ExecutorService executorService;
+
+ @Priority(2)
public void startEmbeddedEngine(@Observes @Initialized(ApplicationScoped.class) Object init) {
LOG.info("Launching Debezium embedded engine");
- Configuration config = Configuration.empty()
- .withSystemProperties(Function.identity()).edit()
- .with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class)
- .with(EmbeddedEngine.ENGINE_NAME, "cache-invalidation-engine")
- .with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class)
- .with("name", "cache-invalidation-connector")
- .with("database.hostname", "postgres")
- .with("database.port", 5432)
- .with("database.user", "postgresuser")
- .with("database.password", "postgrespw")
- .with("topic.prefix", "dbserver1")
- .with("database.dbname", "inventory")
- .with("table.include.list", "public.item")
- .with("plugin.name", "pgoutput")
- .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
+ final Properties properties = new Properties();
+ for (String propertyName : ConfigProvider.getConfig().getPropertyNames()) {
+ if (propertyName.startsWith(CONFIG_PREFIX)) {
+ final String key = propertyName.replace(CONFIG_PREFIX, "");
+ final ConfigValue value = ConfigProvider.getConfig().getConfigValue(propertyName);
+ properties.put(key, value.getRawValue());
+ LOG.info("\t{}: {}", key, value.getRawValue());
+ }
+ }
+
+ final Configuration config = Configuration.empty()
+ .withSystemProperties(Function.identity())
+ .edit()
+ .with(Configuration.from(properties))
.build();
- this.engine = EmbeddedEngine.create()
- .using(config)
- .notifying(this::handleDbChangeEvent)
+ this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
+ .using(config.asProperties())
+ .notifying((list, recordCommitter) -> {
+ for (RecordChangeEvent record : list) {
+ handleDbChangeEvent(record.record());
+ recordCommitter.markProcessed(record);
+ }
+ recordCommitter.markBatchFinished();
+ })
.build();
+ executorService = Executors.newFixedThreadPool(1);
executorService.execute(engine);
}
@PreDestroy
- public void shutdownEngine() {
+ public void shutdownEngine() throws Exception {
LOG.info("Stopping Debezium embedded engine");
- engine.stop();
+ engine.close();
+ executorService.shutdown();
}
private void handleDbChangeEvent(SourceRecord record) {
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java
index dcc06693..bf77f805 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/KnownTransactions.java
@@ -2,12 +2,13 @@
import java.util.concurrent.TimeUnit;
-import javax.annotation.PreDestroy;
-import javax.enterprise.context.ApplicationScoped;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,11 +29,11 @@ public KnownTransactions() {
cacheManager.defineConfiguration(
"tx-id-cache",
new ConfigurationBuilder()
- .simpleCache(true)
- .expiration()
+ .simpleCache(true)
+ .expiration()
.lifespan(60, TimeUnit.SECONDS)
- .build()
- );
+ .build()
+ );
applicationTransactions = cacheManager.getCache("tx-id-cache");
}
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java
index 92518c82..0c0a7706 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationIntegrator.java
@@ -26,11 +26,11 @@ public class TransactionRegistrationIntegrator implements Integrator {
@Override
public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,
- SessionFactoryServiceRegistry serviceRegistry) {
+ SessionFactoryServiceRegistry serviceRegistry) {
LOG.info("TransactionRegistrationIntegrator#integrate()");
serviceRegistry.getService(EventListenerRegistry.class)
- .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
+ .appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
}
@Override
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java
index 6e170c38..71f3346a 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/persistence/TransactionRegistrationListener.java
@@ -8,7 +8,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.enterprise.inject.spi.CDI;
+import jakarta.enterprise.inject.spi.CDI;
import org.hibernate.FlushMode;
import org.hibernate.HibernateException;
@@ -23,8 +23,6 @@
*/
class TransactionRegistrationListener implements FlushEventListener {
- private static final long serialVersionUID = 1L;
-
private final ConcurrentMap sessionsWithBeforeTransactionCompletion;
private volatile KnownTransactions knownTransactions;
@@ -43,7 +41,7 @@ public void onFlush(FlushEvent event) throws HibernateException {
event.getSession().getActionQueue().registerProcess( session -> {
Number txId = (Number) event.getSession().createNativeQuery("SELECT txid_current()")
- .setFlushMode(FlushMode.MANUAL)
+ .setHibernateFlushMode(FlushMode.MANUAL)
.getSingleResult();
getKnownTransactions().register(txId.longValue());
@@ -52,7 +50,7 @@ public void onFlush(FlushEvent event) throws HibernateException {
} );
}
- private KnownTransactions getKnownTransactions() {
+ private KnownTransactions getKnownTransactions() {
KnownTransactions value = knownTransactions;
if (value == null) {
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java
index 7ebb3388..6aef7ebb 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/CacheResource.java
@@ -5,16 +5,16 @@
*/
package io.debezium.examples.cacheinvalidation.rest;
-import javax.enterprise.context.ApplicationScoped;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.PersistenceUnit;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.persistence.EntityManagerFactory;
+import jakarta.persistence.PersistenceUnit;
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.DELETE;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
import io.debezium.examples.cacheinvalidation.model.Item;
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java
index e1c3f21e..03e1e822 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/ItemResource.java
@@ -5,17 +5,17 @@
*/
package io.debezium.examples.cacheinvalidation.rest;
-import javax.enterprise.context.ApplicationScoped;
-import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import javax.transaction.Transactional;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.NotFoundException;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.persistence.EntityManager;
+import jakarta.persistence.PersistenceContext;
+import jakarta.transaction.Transactional;
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.NotFoundException;
+import jakarta.ws.rs.PUT;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
import io.debezium.examples.cacheinvalidation.model.Item;
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java
index 3b77f1d2..70f402ef 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/OrderResource.java
@@ -7,15 +7,15 @@
import java.math.BigDecimal;
-import javax.enterprise.context.ApplicationScoped;
-import javax.persistence.EntityManager;
-import javax.persistence.PersistenceContext;
-import javax.transaction.Transactional;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.persistence.EntityManager;
+import jakarta.persistence.PersistenceContext;
+import jakarta.transaction.Transactional;
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
import io.debezium.examples.cacheinvalidation.model.Item;
import io.debezium.examples.cacheinvalidation.model.PurchaseOrder;
diff --git a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java
index c07cdd4d..4149d891 100644
--- a/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java
+++ b/cache-invalidation/src/main/java/io/debezium/examples/cacheinvalidation/rest/RestApplication.java
@@ -8,8 +8,8 @@
import java.util.HashSet;
import java.util.Set;
-import javax.ws.rs.ApplicationPath;
-import javax.ws.rs.core.Application;
+import jakarta.ws.rs.ApplicationPath;
+import jakarta.ws.rs.core.Application;
@ApplicationPath("/rest")
public class RestApplication extends Application {
diff --git a/cache-invalidation/src/main/resources/META-INF/data.sql b/cache-invalidation/src/main/resources/META-INF/data.sql
index 34187558..aee1c66a 100644
--- a/cache-invalidation/src/main/resources/META-INF/data.sql
+++ b/cache-invalidation/src/main/resources/META-INF/data.sql
@@ -1,3 +1,3 @@
-INSERT INTO Item VALUES (10001, 'The Birds', 9.99);
-INSERT INTO Item VALUES (10002, 'To Catch A Thieve', 12.99);
-INSERT INTO Item VALUES (10003, 'North By Northwest', 14.99);
+INSERT INTO Item (id,description,price) VALUES (10001, 'The Birds', 9.99);
+INSERT INTO Item (id,description,price) VALUES (10002, 'To Catch A Thieve', 12.99);
+INSERT INTO Item (id,description,price) VALUES (10003, 'North By Northwest', 14.99);
diff --git a/cache-invalidation/src/main/resources/META-INF/persistence.xml b/cache-invalidation/src/main/resources/META-INF/persistence.xml
deleted file mode 100644
index 48bf239d..00000000
--- a/cache-invalidation/src/main/resources/META-INF/persistence.xml
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-
-
- java:jboss/datasources/OrderDS
- ENABLE_SELECTIVE
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/cache-invalidation/src/main/resources/application.properties b/cache-invalidation/src/main/resources/application.properties
new file mode 100644
index 00000000..98c820e8
--- /dev/null
+++ b/cache-invalidation/src/main/resources/application.properties
@@ -0,0 +1,25 @@
+# Datasource
+quarkus.datasource.db-kind=pg
+quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/inventory
+quarkus.datasource.username=postgresuser
+quarkus.datasource.password=postgrespw
+
+# Hibernate configuration
+quarkus.hibernate-orm.log.sql=true
+quarkus.hibernate-orm.log.format-sql=true
+quarkus.hibernate-orm.sql-load-script=META-INF/data.sql
+quarkus.hibernate-orm.database.generation=drop-and-create
+
+# Debezium CDC configuration
+quarkus.debezium-cdc.connector.class=io.debezium.connector.postgresql.PostgresConnector
+quarkus.debezium-cdc.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
+quarkus.debezium-cdc.name=cache-invalidation-connector
+quarkus.debezium-cdc.database.hostname=localhost
+quarkus.debezium-cdc.database.port=5432
+quarkus.debezium-cdc.database.user=postgresuser
+quarkus.debezium-cdc.database.password=postgrespw
+quarkus.debezium-cdc.database.dbname=inventory
+quarkus.debezium-cdc.topic.prefix=dbserver1
+quarkus.debezium-cdc.table.include.list=public.item
+quarkus.debezium-cdc.plugin.name=pgoutput
+quarkus.debezium-cdc.snapshot.mode=never
diff --git a/cache-invalidation/src/main/webapp/WEB-INF/beans.xml b/cache-invalidation/src/main/webapp/WEB-INF/beans.xml
deleted file mode 100644
index 7a29cf42..00000000
--- a/cache-invalidation/src/main/webapp/WEB-INF/beans.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
diff --git a/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml b/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml
deleted file mode 100644
index 17da62d0..00000000
--- a/cache-invalidation/src/main/webapp/WEB-INF/jboss-deployment-structure.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
diff --git a/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java b/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java
index 8807a6bc..261c9062 100644
--- a/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java
+++ b/cache-invalidation/src/test/java/io/debezium/examples/cacheinvalidation/CacheInvalidationIT.java
@@ -18,14 +18,27 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
+import io.quarkus.test.junit.QuarkusTest;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.restassured.http.ContentType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
public class CacheInvalidationIT {
- @Before
+ @ConfigProperty(name = "quarkus.datasource.jdbc.url")
+ String jdbcUrl;
+
+ @ConfigProperty(name = "quarkus.datasource.username")
+ String jdbcUserName;
+
+ @ConfigProperty(name = "quarkus.datasource.password")
+ String jdbcPassword;
+
+ @BeforeEach
public void prepareItem() {
updateItem(10003, "North by Northwest", 14.99F);
}
@@ -42,7 +55,7 @@ public void shouldInvalidateCacheAfterDatabaseUpdate() throws Exception {
// cache should be invalidated
await().atMost(5, TimeUnit.SECONDS)
.until(() -> {
- return !get("/cache-invalidation/rest/cache/item/10003").as(boolean.class);
+ return !get("/rest/cache/item/10003").as(boolean.class);
});
// and the item reloaded from the DB
@@ -61,7 +74,7 @@ public void shouldNotInvalidateCacheAfterUpdateThroughApplication() throws Excep
Thread.sleep(3000);
// cache should not be invalidated
- assertTrue(get("/cache-invalidation/rest/cache/item/10003").as(boolean.class));
+ assertTrue(get("/rest/cache/item/10003").as(boolean.class));
}
private void placeOrder(long itemId, int quantity, float expectedTotalPrice) {
@@ -75,7 +88,7 @@ private void placeOrder(long itemId, int quantity, float expectedTotalPrice) {
"}"
)
.when()
- .post("/cache-invalidation/rest/orders")
+ .post("/rest/orders")
.then()
.body("totalPrice", equalTo(expectedTotalPrice));
}
@@ -90,17 +103,16 @@ private void updateItem(long itemId, String newDescription, float newPrice) {
"}"
)
.when()
- .put("/cache-invalidation/rest/items/{id}", itemId)
+ .put("/rest/items/{id}", itemId)
.then()
.statusCode(200);
}
private Connection getDbConnection() throws SQLException {
- String url = "jdbc:postgresql://localhost/inventory";
- Properties props = new Properties();
- props.setProperty("user","postgresuser");
- props.setProperty("password","postgrespw");
+ final Properties props = new Properties();
+ props.setProperty("user", jdbcUserName);
+ props.setProperty("password", jdbcPassword);
- return DriverManager.getConnection(url, props);
+ return DriverManager.getConnection(jdbcUrl, props);
}
}
diff --git a/cache-invalidation/src/test/resources/log4j.properties b/cache-invalidation/src/test/resources/log4j.properties
deleted file mode 100644
index 6c5374ed..00000000
--- a/cache-invalidation/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n
-
-# Root logger option
-log4j.rootLogger=DEBUG, stdout
-
-# Set up the default logging to be DEBUG level, then override specific units
-# log4j.logger.io.debezium=INFO