diff --git a/.github/workflows/cd_build.yml b/.github/workflows/cd_build.yml index c653cd67..180d7351 100644 --- a/.github/workflows/cd_build.yml +++ b/.github/workflows/cd_build.yml @@ -25,7 +25,7 @@ jobs: - name: Build, publish to GPR and tag run: | if [ "$GITHUB_REPOSITORY" == "gruelbox/transaction-outbox" ]; then - revision="5.4.$GITHUB_RUN_NUMBER" + revision="5.5.$GITHUB_RUN_NUMBER" echo "Building $revision at $GITHUB_SHA" mvn -Pconcise,delombok -B deploy -s $GITHUB_WORKSPACE/settings.xml -Drevision="$revision" -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn echo "Tagging $revision" diff --git a/README.md b/README.md index f7bf45ed..db1b7cfe 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ A flexible implementation of the [Transaction Outbox Pattern](https://microservi 1. [Set up the background worker](#set-up-the-background-worker) 1. [Managing the "dead letter queue"](#managing-the-dead-letter-queue) 1. [Advanced](#advanced) + 1. [Topics and FIFO ordering](#topics-and-fifo-ordering) 1. [The nested outbox pattern](#the-nested-outbox-pattern) 1. [Idempotency protection](#idempotency-protection) 1. [Flexible serialization](#flexible-serialization-beta) @@ -291,13 +292,13 @@ TransactionOutbox.builder() To mark the work for reprocessing, just use [`TransactionOutbox.unblock()`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-core/latest/com/gruelbox/transactionoutbox/TransactionOutbox.html). Its failure count will be marked back down to zero and it will get reprocessed on the next call to `flush()`: -``` +```java transactionOutboxEntry.unblock(entryId); ``` Or if using a `TransactionManager` that relies on explicit context (such as a non-thread local [`JooqTransactionManager`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-jooq/latest/com/gruelbox/transactionoutbox/JooqTransactionManager.html)): -``` +```java transactionOutboxEntry.unblock(entryId, context); ``` @@ -305,6 +306,54 @@ A good approach here is to use the [`TransactionOutboxListener`](https://www.jav ## Advanced +### Topics and FIFO ordering + +For some applications, the order in which tasks are processed is important, such as when: + + - using the outbox to write to a FIFO queue, Kafka or AWS Kinesis topic; or + - data replication, e.g. when feeding a data warehouse or distributed cache. + +In these scenarios, the default behaviour is unsuitable. Tasks are usually processed in a highly parallel fashion. +Even if the volume of tasks is low, if a task fails and is retried later, it can easily end up processing after +some later task even if that later task was processed hours or even days after the failing one. + +To avoid problems associated with tasks being processed out-of-order, you can order the processing of your tasks +within a named "topic": + +```java +outbox.with().ordered("topic1").schedule(Service.class).process("red"); +outbox.with().ordered("topic2").schedule(Service.class).process("green"); +outbox.with().ordered("topic1").schedule(Service.class).process("blue"); +outbox.with().ordered("topic2").schedule(Service.class).process("yellow"); +``` + +No matter what happens: + + - `red` will always need to be processed (successfully) before `blue`; + - `green` will always need to be processed (successfully) before `yellow`; but + - `red` and `blue` can run in any sequence with respect to `green` and `yellow`. + +This functionality was specifically designed to allow outboxed writing to Kafka topics. For maximum throughput +when writing to Kafka, it is advised that you form your outbox topic name by combining the Kafka topic and partition, +since that is the boundary where ordering is required. + +There are a number of things to consider before using this feature: + + - Tasks are not processed immediately when submitting, as normal, and are processed by + background flushing only. This means there will be an increased delay between the source transaction being + committed and the task being processed, depending on how your application calls `TransactionOutbox.flush()`. + - If a task fails, no further requests will be processed _in that topic_ until + a subsequent retry allows the failing task to succeed, to preserve ordered + processing. This means it is possible for topics to become entirely frozen in the event + that a task fails repeatedly. For this reason, it is essential to use a + `TransactionOutboxListener` to watch for failing tasks and investigate quickly. Note + that other topics will be unaffected. + - `TransactionOutboxBuilder.blockAfterAttempts` is ignored for all tasks that use this + option. + - A single topic can only be processed in single-threaded fashion, but separate topics can be processed in + parallel. If your tasks use a small number of topics, scalability will be affected since the degree of + parallelism will be reduced. + ### The nested-outbox pattern In practice it can be extremely hard to guarantee that an entire unit of work is idempotent and thus suitable for retry. For example, the request might be to "update a customer record" with a new address, but this might record the change to an audit history table with a fresh UUID, the current date and time and so on, which in turn triggers external changes outside the transaction. The parent customer update request may be idempotent, but the downstream effects may not be. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 53d512bb..7eb95e93 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.Map; import java.util.TreeMap; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; import lombok.AccessLevel; @@ -41,6 +42,17 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep } } + @Override + public String fetchAndLockNextInTopic(String fields, String table) { + return String.format( + "SELECT %s FROM %s" + + " WHERE topic = ?" + + " AND processed = %s" + + " ORDER BY seq ASC" + + " %s FOR UPDATE", + fields, table, booleanValue(false), limitCriteria.replace("?", "1")); + } + @Override public String toString() { return name; @@ -63,6 +75,7 @@ static final class Builder { private Map migrations; private Function booleanValueFrom; private SQLAction createVersionTableBy; + private BiFunction fetchAndLockNextInTopic; Builder(String name) { this.name = name; @@ -120,6 +133,27 @@ static final class Builder { 8, "Update length of invocation column on outbox for MySQL dialects only.", "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); + migrations.put( + 9, + new Migration( + 9, + "Add topic", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NOT NULL DEFAULT '*'")); + migrations.put( + 10, + new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); + migrations.put( + 11, + new Migration( + 11, + "Add sequence table", + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); + migrations.put( + 12, + new Migration( + 12, + "Add flush index to support ordering", + "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); } Builder setMigration(Migration migration) { @@ -154,6 +188,14 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep super.createVersionTableIfNotExists(connection); } } + + @Override + public String fetchAndLockNextInTopic(String fields, String table) { + if (fetchAndLockNextInTopic != null) { + return fetchAndLockNextInTopic.apply(fields, table); + } + return super.fetchAndLockNextInTopic(fields, table); + } }; } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 5a7c37ed..c194e594 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -4,16 +4,9 @@ import java.io.Reader; import java.io.StringWriter; import java.io.Writer; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLTimeoutException; -import java.sql.Statement; -import java.sql.Timestamp; +import java.sql.*; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -37,7 +30,7 @@ public class DefaultPersistor implements Persistor, Validatable { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -110,26 +103,28 @@ public void writeSchema(Writer writer) { public void save(Transaction tx, TransactionOutboxEntry entry) throws SQLException, AlreadyScheduledException { var insertSql = - "INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "INSERT INTO " + + tableName + + " (" + + ALL_FIELDS + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; var writer = new StringWriter(); serializer.serializeInvocation(entry.getInvocation(), writer); + if (entry.getTopic() != null) { + setNextSequence(tx, entry); + log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); + } + PreparedStatement stmt = tx.prepareBatchStatement(insertSql); + setupInsert(entry, writer, stmt); if (entry.getUniqueRequestId() == null) { - PreparedStatement stmt = tx.prepareBatchStatement(insertSql); - setupInsert(entry, writer, stmt); stmt.addBatch(); log.debug("Inserted {} in batch", entry.description()); } else { - //noinspection resource - try (PreparedStatement stmt = tx.connection().prepareStatement(insertSql)) { - setupInsert(entry, writer, stmt); + try { stmt.executeUpdate(); log.debug("Inserted {} immediately", entry.description()); - } catch (SQLIntegrityConstraintViolationException e) { - throw new AlreadyScheduledException( - "Request " + entry.description() + " already exists", e); } catch (Exception e) { - if (e.getClass().getName().equals("org.postgresql.util.PSQLException") - && e.getMessage().contains("constraint")) { + if (indexViolation(e)) { throw new AlreadyScheduledException( "Request " + entry.description() + " already exists", e); } @@ -138,19 +133,65 @@ public void save(Transaction tx, TransactionOutboxEntry entry) } } + private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throws SQLException { + //noinspection resource + var seqSelect = + tx.prepareBatchStatement("SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE"); + seqSelect.setString(1, entry.getTopic()); + try (ResultSet rs = seqSelect.executeQuery()) { + if (rs.next()) { + entry.setSequence(rs.getLong(1) + 1L); + //noinspection resource + var seqUpdate = + tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?"); + seqUpdate.setLong(1, entry.getSequence()); + seqUpdate.setString(2, entry.getTopic()); + seqUpdate.executeUpdate(); + } else { + try { + entry.setSequence(1L); + //noinspection resource + var seqInsert = + tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)"); + seqInsert.setString(1, entry.getTopic()); + seqInsert.setLong(2, entry.getSequence()); + seqInsert.executeUpdate(); + } catch (Exception e) { + if (indexViolation(e)) { + setNextSequence(tx, entry); + } else { + throw e; + } + } + } + } + } + + private boolean indexViolation(Exception e) { + return (e instanceof SQLIntegrityConstraintViolationException) + || (e.getClass().getName().equals("org.postgresql.util.PSQLException") + && e.getMessage().contains("constraint")); + } + private void setupInsert( TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt) throws SQLException { stmt.setString(1, entry.getId()); stmt.setString(2, entry.getUniqueRequestId()); stmt.setString(3, writer.toString()); + stmt.setString(4, entry.getTopic() == null ? "*" : entry.getTopic()); + if (entry.getSequence() == null) { + stmt.setObject(5, null); + } else { + stmt.setLong(5, entry.getSequence()); + } stmt.setTimestamp( - 4, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); - stmt.setTimestamp(5, Timestamp.from(entry.getNextAttemptTime())); - stmt.setInt(6, entry.getAttempts()); - stmt.setBoolean(7, entry.isBlocked()); - stmt.setBoolean(8, entry.isProcessed()); - stmt.setInt(9, entry.getVersion()); + 6, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); + stmt.setTimestamp(7, Timestamp.from(entry.getNextAttemptTime())); + stmt.setInt(8, entry.getAttempts()); + stmt.setBoolean(9, entry.isBlocked()); + stmt.setBoolean(10, entry.isProcessed()); + stmt.setInt(11, entry.getVersion()); } @Override @@ -239,22 +280,24 @@ public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Excepti @Override public boolean unblock(Transaction tx, String entryId) throws Exception { - @SuppressWarnings("resource") - PreparedStatement stmt = - tx.prepareBatchStatement( - "UPDATE " - + tableName - + " SET attempts = 0, blocked = " - + dialect.booleanValue(false) - + " " - + "WHERE blocked = " - + dialect.booleanValue(true) - + " AND processed = " - + dialect.booleanValue(false) - + " AND id = ?"); - stmt.setString(1, entryId); - stmt.setQueryTimeout(writeLockTimeoutSeconds); - return stmt.executeUpdate() != 0; + //noinspection resource + try (PreparedStatement stmt = + tx.connection() + .prepareStatement( + "UPDATE " + + tableName + + " SET attempts = 0, blocked = " + + dialect.booleanValue(false) + + " " + + "WHERE blocked = " + + dialect.booleanValue(true) + + " AND processed = " + + dialect.booleanValue(false) + + " AND id = ?")) { + stmt.setString(1, entryId); + stmt.setQueryTimeout(writeLockTimeoutSeconds); + return stmt.executeUpdate() != 0; + } } @Override @@ -274,11 +317,44 @@ public List selectBatch(Transaction tx, int batchSize, I + dialect.booleanValue(false) + " AND processed = " + dialect.booleanValue(false) + + " AND topic = '*'" + dialect.getLimitCriteria() + forUpdate)) { stmt.setTimestamp(1, Timestamp.from(now)); stmt.setInt(2, batchSize); - return gatherResults(batchSize, stmt); + var result = new ArrayList(batchSize); + gatherResults(stmt, result); + return result; + } + } + + @Override + public List selectActiveTopics(Transaction tx) throws Exception { + var sql = "SELECT DISTINCT topic FROM %s WHERE topic <> '*' AND processed = %s"; + String falseStr = dialect.booleanValue(false); + //noinspection resource + try (PreparedStatement stmt = + tx.connection().prepareStatement(String.format(sql, tableName, falseStr, falseStr))) { + var result = new ArrayList(); + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + result.add(rs.getString(1)); + } + } + return result; + } + } + + @Override + public Optional nextInTopic(Transaction tx, String topic) + throws Exception { + //noinspection resource + try (PreparedStatement stmt = + tx.connection().prepareStatement(dialect.fetchAndLockNextInTopic(ALL_FIELDS, tableName))) { + stmt.setString(1, topic); + var results = new ArrayList(1); + gatherResults(stmt, results); + return results.stream().findFirst(); } } @@ -295,25 +371,30 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) } } - private List gatherResults(int batchSize, PreparedStatement stmt) + private void gatherResults(PreparedStatement stmt, Collection output) throws SQLException, IOException { try (ResultSet rs = stmt.executeQuery()) { - ArrayList result = new ArrayList<>(batchSize); while (rs.next()) { - result.add(map(rs)); + output.add(map(rs)); } - log.debug("Found {} results", result.size()); - return result; + log.debug("Found {} results", output.size()); } } private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOException { + String topic = rs.getString("topic"); + Long sequence = rs.getLong("seq"); + if (rs.wasNull()) { + sequence = null; + } try (Reader invocationStream = rs.getCharacterStream("invocation")) { TransactionOutboxEntry entry = TransactionOutboxEntry.builder() .id(rs.getString("id")) .uniqueRequestId(rs.getString("uniqueRequestId")) .invocation(serializer.deserializeInvocation(invocationStream)) + .topic("*".equals(topic) ? null : topic) + .sequence(sequence) .lastAttemptTime( rs.getTimestamp("lastAttemptTime") == null ? null diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 7f54401f..c1aff8ed 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -28,6 +28,8 @@ public interface Dialect { void createVersionTableIfNotExists(Connection connection) throws SQLException; + String fetchAndLockNextInTopic(String fields, String table); + Stream getMigrations(); Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); @@ -74,6 +76,11 @@ public interface Dialect { .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)") .disableMigration(8) + .changeMigration(9, "ALTER TABLE TXNO_OUTBOX ADD topic VARCHAR(250) DEFAULT '*' NOT NULL") + .changeMigration(10, "ALTER TABLE TXNO_OUTBOX ADD seq NUMBER") + .changeMigration( + 11, + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") .booleanValueFrom(v -> v ? "1" : "0") .createVersionTableBy( connection -> { @@ -88,5 +95,15 @@ public interface Dialect { } } }) + .fetchAndLockNextInTopic( + (fields, table) -> + String.format( + "SELECT %s FROM %s outer" + + " WHERE outer.topic = ?" + + " AND outer.processed = 0" + + " AND outer.seq = (" + + "SELECT MIN(seq) FROM %s inner WHERE inner.topic=outer.topic AND inner.processed=0" + + " ) FOR UPDATE", + fields, table, table)) .build(); } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java index 0a91e113..3cc815d8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java @@ -5,6 +5,7 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.Callable; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -108,6 +109,24 @@ void withinMDC(Runnable runnable) { } } + T withinMDC(Callable callable) throws Exception { + if (mdc != null && MDC.getMDCAdapter() != null) { + var oldMdc = MDC.getCopyOfContextMap(); + MDC.setContextMap(mdc); + try { + return callable.call(); + } finally { + if (oldMdc == null) { + MDC.clear(); + } else { + MDC.setContextMap(oldMdc); + } + } + } else { + return callable.call(); + } + } + void invoke(Object instance, TransactionOutboxListener listener) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { Method method = instance.getClass().getDeclaredMethod(methodName, parameterTypes); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java index 8b03f157..bb54baa9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java @@ -2,6 +2,7 @@ import java.time.Instant; import java.util.List; +import java.util.Optional; /** * Saves and loads {@link TransactionOutboxEntry}s. For most use cases, just use {@link @@ -105,6 +106,25 @@ static DefaultPersistor forDialect(Dialect dialect) { List selectBatch(Transaction tx, int batchSize, Instant now) throws Exception; + /** + * Selects the list of topics with work awaiting processing. + * + * @param tx The current {@link Transaction}. + * @return The topics. + * @throws Exception Any exception. + */ + List selectActiveTopics(final Transaction tx) throws Exception; + + /** + * Fetches and locks the next available piece of work on the specified topic. + * + * @param tx The current {@link Transaction}. + * @param topic The topic. + * @return The next available piece of work on the selected topic. + * @throws Exception ANy exception. + */ + Optional nextInTopic(Transaction tx, String topic) throws Exception; + /** * Deletes records which have processed and passed their expiry time, in specified batch sizes. * diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java index 45d8d449..a9826388 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java @@ -2,6 +2,7 @@ import java.time.Instant; import java.util.List; +import java.util.Optional; import lombok.Builder; /** Stub implementation of {@link Persistor}. */ @@ -45,6 +46,16 @@ public List selectBatch(Transaction tx, int batchSize, I return List.of(); } + @Override + public List selectActiveTopics(Transaction tx) { + return List.of(); + } + + @Override + public Optional nextInTopic(Transaction tx, String topic) { + return Optional.empty(); + } + @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) { return 0; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index f240a2e7..59bcdf67 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -66,6 +66,17 @@ static TransactionOutboxBuilder builder() { */ ParameterizedScheduleBuilder with(); + /** + * Flush in a single thread. Calls {@link #flush(Executor)} with an {@link Executor} which runs + * all work in the current thread. + * + * @see #flush(Executor) + * @return true if any work was flushed. + */ + default boolean flush() { + return flush(Runnable::run); + } + /** * Identifies any stale tasks queued using {@link #schedule(Class)} (those which were queued more * than supplied {@link TransactionOutboxBuilder#attemptFrequency(Duration)} ago and have been @@ -85,10 +96,11 @@ static TransactionOutboxBuilder builder() { *

Additionally, expires any records completed prior to the {@link * TransactionOutboxBuilder#retentionThreshold(Duration)}. * + * @param executor to be used for parallelising work (note that the method overall is blocking and + * this is solely ued for fork-join semantics). * @return true if any work was flushed. */ - @SuppressWarnings("UnusedReturnValue") - boolean flush(); + boolean flush(Executor executor); /** * Unblocks a blocked entry and resets the attempt count so that it will be retried again. @@ -312,6 +324,50 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); + /** + * Specifies that the request should be applied in a strictly-ordered fashion within the + * specified topic. + * + *

This is useful for a number of applications, such as feeding messages into an ordered + * pipeline such as a FIFO queue or Kafka topic, or for reliable data replication, such as when + * feeding a data warehouse or distributed cache. + * + *

Note that using this option has a number of consequences: + * + *

    + *
  • Requests are not processed immediately when submitting a request, as normal, and are + * processed by {@link TransactionOutbox#flush()} only. As a result there will be + * increased delay between the source transaction being committed and the request being + * processed. + *
  • If a request fails, no further requests will be processed in that topic until + * a subsequent retry allows the failing request to succeed, to preserve ordered + * processing. This means it is possible for topics to become entirely frozen in the event + * that a request fails repeatedly. For this reason, it is essential to use a {@link + * TransactionOutboxListener} to watch for failing requests and investigate quickly. Note + * that other topics will be unaffected. + *
  • For the same reason, {@link TransactionOutboxBuilder#blockAfterAttempts} is ignored for + * all requests that use this option. The only safe way to recover from a failing request + * is to make the request succeed. + *
  • A single topic can only be processed in single-threaded fashion, so if your requests + * use a small number of topics, scalability will be affected since the degree of + * parallelism will be reduced. + *
  • Throughput is significantly reduced and database load increased more generally, even + * with larger numbers of topics, since records are only processed one-at-a-time rather + * than in batches, which is less optimised. + *
  • In general, databases + * are not well optimised for this sort of thing. Don't expect miracles. If you need + * more throughput, you probably need to think twice about your architecture. Consider the + * event sourcing + * pattern, for example, where the message queue is the primary data store rather than + * a secondary, and remove the need for an outbox entirely. + *
+ * + * @param topic a free-text string up to 250 characters. + * @return Builder. + */ + ParameterizedScheduleBuilder ordered(String topic); + /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters * to the request as configured using {@link TransactionOutbox#with()}. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index f86e4248..01749722 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -4,11 +4,7 @@ import java.time.Instant; import java.util.Arrays; -import lombok.AccessLevel; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; /** @@ -36,6 +32,21 @@ public class TransactionOutboxEntry implements Validatable { @Getter private final String uniqueRequestId; + /** + * @param topic An optional scope for ordered sequencing. + */ + @SuppressWarnings("JavaDoc") + @Getter + private final String topic; + + /** + * @param sequence The ordered sequence within the {@code topic}. + */ + @SuppressWarnings("JavaDoc") + @Getter + @Setter + private Long sequence; + /** * @param invocation The method invocation to perform. * @return The method invocation to perform. @@ -113,7 +124,7 @@ public String description() { if (!this.initialized) { String description = String.format( - "%s.%s(%s) [%s]%s", + "%s.%s(%s) [%s]%s%s", invocation.getClassName(), invocation.getMethodName(), invocation.getArgs() == null @@ -122,7 +133,8 @@ public String description() { .map(this::stringify) .collect(joining(", ")), id, - uniqueRequestId == null ? "" : " uid=[" + uniqueRequestId + "]"); + uniqueRequestId == null ? "" : " uid=[" + uniqueRequestId + "]", + topic == null ? "" : " seq=[" + topic + "/" + sequence + "]"); this.description = description; this.initialized = true; return description; @@ -149,9 +161,10 @@ private String stringify(Object o) { public void validate(Validator validator) { validator.notNull("id", id); validator.nullOrNotBlank("uniqueRequestId", uniqueRequestId); + validator.nullOrNotBlank("topic", topic); validator.notNull("invocation", invocation); - validator.inFuture("nextAttemptTime", nextAttemptTime); validator.positiveOrZero("attempts", attempts); validator.positiveOrZero("version", version); + validator.isTrue("topic", !"*".equals(topic), "Topic may not be *"); } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 022f5145..10436da5 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -8,23 +8,27 @@ import com.gruelbox.transactionoutbox.spi.ProxyFactory; import com.gruelbox.transactionoutbox.spi.Utils; import java.lang.reflect.InvocationTargetException; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; +import java.time.*; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.slf4j.event.Level; @Slf4j -class TransactionOutboxImpl implements TransactionOutbox, Validatable { - - private static final int DEFAULT_FLUSH_BATCH_SIZE = 4096; +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +final class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final TransactionManager transactionManager; private final Persistor persistor; @@ -42,39 +46,6 @@ class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final AtomicBoolean initialized = new AtomicBoolean(); private final ProxyFactory proxyFactory = new ProxyFactory(); - private TransactionOutboxImpl( - TransactionManager transactionManager, - Instantiator instantiator, - Submitter submitter, - Duration attemptFrequency, - int blockAfterAttempts, - int flushBatchSize, - Supplier clockProvider, - TransactionOutboxListener listener, - Persistor persistor, - Level logLevelTemporaryFailure, - Boolean serializeMdc, - Duration retentionThreshold, - Boolean initializeImmediately) { - this.transactionManager = transactionManager; - this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection); - this.persistor = persistor; - this.submitter = Utils.firstNonNull(submitter, Submitter::withDefaultExecutor); - this.attemptFrequency = Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)); - this.blockAfterAttempts = blockAfterAttempts < 1 ? 5 : blockAfterAttempts; - this.flushBatchSize = flushBatchSize < 1 ? DEFAULT_FLUSH_BATCH_SIZE : flushBatchSize; - this.clockProvider = clockProvider == null ? Clock::systemDefaultZone : clockProvider; - this.listener = Utils.firstNonNull(listener, () -> new TransactionOutboxListener() {}); - this.logLevelTemporaryFailure = Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN); - this.validator = new Validator(this.clockProvider); - this.serializeMdc = serializeMdc == null || serializeMdc; - this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold; - this.validator.validate(this); - if (initializeImmediately == null || initializeImmediately) { - initialize(); - } - } - @Override public void validate(Validator validator) { validator.notNull("transactionManager", transactionManager); @@ -108,7 +79,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null); + return schedule(clazz, null, null); } @Override @@ -116,19 +87,7 @@ public ParameterizedScheduleBuilder with() { return new ParameterizedScheduleBuilderImpl(); } - @SuppressWarnings("UnusedReturnValue") - @Override - public boolean flush() { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } - Instant now = clockProvider.get().instant(); - List batch = flush(now); - expireIdempotencyProtection(now); - return !batch.isEmpty(); - } - - private List flush(Instant now) { + private boolean flushStale(Instant now) { log.debug("Flushing stale tasks"); var batch = transactionManager.inTransactionReturns( @@ -150,7 +109,34 @@ private List flush(Instant now) { log.debug("Got batch of {}", batch.size()); batch.forEach(this::submitNow); log.debug("Submitted batch"); - return batch; + return !batch.isEmpty(); + } + + @Override + public boolean flush(Executor executor) { + if (!initialized.get()) { + throw new IllegalStateException("Not initialized"); + } + Instant now = clockProvider.get().instant(); + List> futures = new ArrayList<>(); + + futures.add(CompletableFuture.supplyAsync(() -> flushStale(now), executor)); + + futures.add( + CompletableFuture.runAsync(() -> expireIdempotencyProtection(now), executor) + .thenApply(it -> false)); + + var topics = + transactionManager.inTransactionReturns( + tx -> uncheckedly(() -> persistor.selectActiveTopics(tx))); + if (!topics.isEmpty()) { + topics.stream() + .map(topic -> CompletableFuture.supplyAsync(() -> processNextInTopic(topic), executor)) + .forEach(futures::add); + } + + var allResults = futures.stream().reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> d1 || d2)); + return allResults.map(CompletableFuture::join).orElse(false); } private void expireIdempotencyProtection(Instant now) { @@ -222,7 +208,7 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule(Class clazz, String uniqueRequestId) { + private T schedule(Class clazz, String uniqueRequestId, String topic) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -238,7 +224,8 @@ private T schedule(Class clazz, String uniqueRequestId) { extracted.getMethodName(), extracted.getParameters(), extracted.getArgs(), - uniqueRequestId); + uniqueRequestId, + topic); validator.validate(entry); persistor.save(extracted.getTransaction(), entry); extracted @@ -246,7 +233,9 @@ private T schedule(Class clazz, String uniqueRequestId) { .addPostCommitHook( () -> { listener.scheduled(entry); - submitNow(entry); + if (entry.getTopic() == null) { + submitNow(entry); + } }); log.debug( "Scheduled {} for running after transaction commit", entry.description()); @@ -261,45 +250,94 @@ private void submitNow(TransactionOutboxEntry entry) { @Override @SuppressWarnings("WeakerAccess") public void processNow(TransactionOutboxEntry entry) { + initialize(); + Boolean success = null; + try { + success = + transactionManager.inTransactionReturnsThrows( + tx -> { + if (!persistor.lock(tx, entry)) { + return false; + } + processWithExistingLock(tx, entry); + return true; + }); + } catch (InvocationTargetException e) { + updateAttemptCount(entry, e.getCause()); + } catch (Exception e) { + updateAttemptCount(entry, e); + } + if (success != null) { + if (success) { + log.info("Processed {}", entry.description()); + listener.success(entry); + } else { + log.debug("Skipped task {} - may be locked or already processed", entry.getId()); + } + } + } + + private boolean processNextInTopic(String topic) { + var attempted = new AtomicReference(); + var success = false; + try { + success = + transactionManager.inTransactionReturnsThrows( + tx -> { + var next = persistor.nextInTopic(tx, topic); + if (next.isPresent()) { + if (next.get().getNextAttemptTime().isBefore(clockProvider.get().instant())) { + log.info("Topic {}: processing seq {}", topic, next.get().getSequence()); + attempted.set(next.get()); + processWithExistingLock(tx, next.get()); + return true; + } else { + log.info( + "Topic {}: ignoring until {}", + topic, + ZonedDateTime.ofInstant(next.get().getNextAttemptTime(), ZoneId.of("UTC"))); + return false; + } + } else { + return false; + } + }); + } catch (InvocationTargetException e) { + if (attempted.get() != null) { + updateAttemptCount(attempted.get(), e.getCause()); + } + } catch (Exception e) { + if (attempted.get() != null) { + updateAttemptCount(attempted.get(), e); + } + } + if (success) { + log.info("Processed {}", attempted.get().description()); + listener.success(attempted.get()); + } + return success; + } + + private void processWithExistingLock(Transaction tx, TransactionOutboxEntry entry) + throws Exception { + initialize(); entry .getInvocation() .withinMDC( () -> { - try { - initialize(); - var success = - transactionManager.inTransactionReturnsThrows( - transaction -> { - if (!persistor.lock(transaction, entry)) { - return false; - } - log.info("Processing {}", entry.description()); - invoke(entry, transaction); - if (entry.getUniqueRequestId() == null) { - persistor.delete(transaction, entry); - } else { - log.debug( - "Deferring deletion of {} by {}", - entry.description(), - retentionThreshold); - entry.setProcessed(true); - entry.setLastAttemptTime(Instant.now(clockProvider.get())); - entry.setNextAttemptTime(after(retentionThreshold)); - persistor.update(transaction, entry); - } - return true; - }); - if (success) { - log.info("Processed {}", entry.description()); - listener.success(entry); - } else { - log.debug("Skipped task {} - may be locked or already processed", entry.getId()); - } - } catch (InvocationTargetException e) { - updateAttemptCount(entry, e.getCause()); - } catch (Exception e) { - updateAttemptCount(entry, e); + log.info("Processing {}", entry.description()); + invoke(entry, tx); + if (entry.getUniqueRequestId() == null) { + persistor.delete(tx, entry); + } else { + log.debug( + "Deferring deletion of {} by {}", entry.description(), retentionThreshold); + entry.setProcessed(true); + entry.setLastAttemptTime(Instant.now(clockProvider.get())); + entry.setNextAttemptTime(after(retentionThreshold)); + persistor.update(tx, entry); } + return true; }); } @@ -313,7 +351,12 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction) } private TransactionOutboxEntry newEntry( - Class clazz, String methodName, Class[] params, Object[] args, String uniqueRequestId) { + Class clazz, + String methodName, + Class[] params, + Object[] args, + String uniqueRequestId, + String topic) { return TransactionOutboxEntry.builder() .id(UUID.randomUUID().toString()) .invocation( @@ -324,8 +367,9 @@ private TransactionOutboxEntry newEntry( args, serializeMdc && (MDC.getMDCAdapter() != null) ? MDC.getCopyOfContextMap() : null)) .lastAttemptTime(null) - .nextAttemptTime(after(attemptFrequency)) + .nextAttemptTime(clockProvider.get().instant()) .uniqueRequestId(uniqueRequestId) + .topic(topic) .build(); } @@ -350,12 +394,9 @@ private Instant after(Duration duration) { private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) { try { entry.setAttempts(entry.getAttempts() + 1); - var blocked = entry.getAttempts() >= blockAfterAttempts; + var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts); entry.setBlocked(blocked); - entry.setLastAttemptTime(Instant.now(clockProvider.get())); - entry.setNextAttemptTime(after(attemptFrequency)); - validator.validate(entry); - transactionManager.inTransactionThrows(transaction -> persistor.update(transaction, entry)); + transactionManager.inTransactionThrows(tx -> pushBack(tx, entry)); listener.failure(entry, cause); if (blocked) { log.error( @@ -390,39 +431,43 @@ static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder { } public TransactionOutboxImpl build() { - return new TransactionOutboxImpl( - transactionManager, - instantiator, - submitter, - attemptFrequency, - blockAfterAttempts, - flushBatchSize, - clockProvider, - listener, - persistor, - logLevelTemporaryFailure, - serializeMdc, - retentionThreshold, - initializeImmediately); + Validator validator = new Validator(this.clockProvider); + TransactionOutboxImpl impl = + new TransactionOutboxImpl( + transactionManager, + persistor, + Utils.firstNonNull(instantiator, Instantiator::usingReflection), + Utils.firstNonNull(submitter, Submitter::withDefaultExecutor), + Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)), + Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN), + blockAfterAttempts < 1 ? 5 : blockAfterAttempts, + flushBatchSize < 1 ? 4096 : flushBatchSize, + clockProvider == null ? Clock::systemDefaultZone : clockProvider, + Utils.firstNonNull(listener, () -> TransactionOutboxListener.EMPTY), + serializeMdc == null || serializeMdc, + validator, + retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold); + validator.validate(impl); + if (initializeImmediately == null || initializeImmediately) { + impl.initialize(); + } + return impl; } } + @Accessors(fluent = true, chain = true) + @Setter private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder { private String uniqueRequestId; - - @Override - public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { - this.uniqueRequestId = uniqueRequestId; - return this; - } + private String ordered; @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered); } } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java index d7c9f2f4..c0240cb8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java @@ -5,6 +5,8 @@ /** A listener for events fired by {@link TransactionOutbox}. */ public interface TransactionOutboxListener { + TransactionOutboxListener EMPTY = new TransactionOutboxListener() {}; + /** * Fired when a transaction outbox task is scheduled. * diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java index 0454c930..a672afa5 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java @@ -1,7 +1,6 @@ package com.gruelbox.transactionoutbox; import java.time.Clock; -import java.time.Instant; import java.util.function.Supplier; class Validator { @@ -38,6 +37,12 @@ public void notNull(String propertyName, Object object) { } } + public void isTrue(String propertyName, boolean condition, String message, Object... args) { + if (!condition) { + error(propertyName, String.format(message, args)); + } + } + public void nullOrNotBlank(String propertyName, String object) { if (object != null && object.isEmpty()) { error(propertyName, "may be either null or non-blank"); @@ -51,13 +56,6 @@ public void notBlank(String propertyName, String object) { } } - public void inFuture(String propertyName, Instant object) { - notNull(propertyName, object); - if (!object.isAfter(clockProvider.get().instant())) { - error(propertyName, "must be in the future"); - } - } - public void positiveOrZero(String propertyName, int object) { min(propertyName, object, 0); } diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java index 305a97be..e1921d03 100644 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java @@ -1,7 +1,6 @@ package com.gruelbox.transactionoutbox; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; import java.math.BigDecimal; import java.time.Clock; @@ -21,28 +20,6 @@ class TestValidator { private final Instant now = Instant.now(); private final Validator validator = new Validator(() -> Clock.fixed(now, ZoneId.of("+4"))); - @Test - void testEntryDateInPast() { - TransactionOutboxEntry entry = - TransactionOutboxEntry.builder() - .id("FOO") - .invocation(COMPLEX_INVOCATION) - .nextAttemptTime(now.minusMillis(1)) - .build(); - assertThrows(IllegalArgumentException.class, () -> validator.validate(entry)); - } - - @Test - void testEntryDateNow() { - TransactionOutboxEntry entry = - TransactionOutboxEntry.builder() - .id("FOO") - .invocation(COMPLEX_INVOCATION) - .nextAttemptTime(now) - .build(); - assertThrows(IllegalArgumentException.class, () -> validator.validate(entry)); - } - @Test void testEntryDateFuture() { TransactionOutboxEntry entry = diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 2e48f70f..1eacda39 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -22,12 +22,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,11 +35,85 @@ public abstract class AbstractAcceptanceTest extends BaseTest { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptanceTest.class); - private final ExecutorService unreliablePool = - new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16)); + + private ExecutorService unreliablePool; + private ExecutorService singleThreadPool; private static final Random random = new Random(); + @BeforeEach + void beforeEachBase() { + unreliablePool = + new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16)); + singleThreadPool = Executors.newSingleThreadExecutor(); + } + + @AfterEach + void afterEachBase() throws InterruptedException { + unreliablePool.shutdown(); + singleThreadPool.shutdown(); + assertTrue(unreliablePool.awaitTermination(30, SECONDS)); + assertTrue(singleThreadPool.awaitTermination(30, SECONDS)); + } + + @Test + final void sequencing() throws Exception { + int countPerTopic = 50; + + CountDownLatch latch = new CountDownLatch(countPerTopic * 2); + TransactionManager transactionManager = txManager(); + OrderedEntryListener orderedEntryListener = + new OrderedEntryListener(latch, new CountDownLatch(1)); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .submitter(Submitter.withExecutor(unreliablePool)) + .attemptFrequency(Duration.ofMillis(500)) + .instantiator(new RandomFailingInstantiator()) + .persistor(persistor()) + .listener(orderedEntryListener) + .initializeImmediately(false) + .build(); + + outbox.initialize(); + clearOutbox(); + + withRunningFlusher( + outbox, + () -> { + for (int ix = 1; ix <= countPerTopic; ix++) { + final int i = ix; + transactionManager.inTransaction( + () -> { + outbox + .with() + .ordered("topic1") + .schedule(InterfaceProcessor.class) + .process(i, "topic1"); + outbox + .with() + .ordered("topic2") + .schedule(InterfaceProcessor.class) + .process(i, "topic2"); + }); + } + assertTrue(latch.await(30, SECONDS)); + var indexes = LongStream.range(1, countPerTopic + 1).boxed().collect(Collectors.toList()); + assertEquals( + indexes, + orderedEntryListener.getSuccesses().stream() + .filter(it -> "topic1".equals(it.getTopic())) + .map(TransactionOutboxEntry::getSequence) + .collect(Collectors.toList())); + assertEquals( + indexes, + orderedEntryListener.getSuccesses().stream() + .filter(it -> "topic2".equals(it.getTopic())) + .map(TransactionOutboxEntry::getSequence) + .collect(Collectors.toList())); + }); + } + /** * Uses a simple direct transaction manager and connection manager and attempts to fire an * interface using a custom instantiator. @@ -366,7 +440,7 @@ final void retryBehaviour() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) - .submitter(Submitter.withExecutor(unreliablePool)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(new LatchListener(latch)) .build(); @@ -379,7 +453,8 @@ final void retryBehaviour() throws Exception { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); assertTrue(latch.await(15, SECONDS)); - }); + }, + singleThreadPool); } @Test @@ -437,7 +512,7 @@ final void lastAttemptTime_updatesEveryTime() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) - .submitter(Submitter.withExecutor(unreliablePool)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(orderedEntryListener) .blockAfterAttempts(2) @@ -450,30 +525,30 @@ final void lastAttemptTime_updatesEveryTime() throws Exception { () -> { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); - assertTrue(blockLatch.await(10, SECONDS)); + assertTrue(blockLatch.await(20, SECONDS), "Entry was not blocked"); assertTrue( (Boolean) transactionManager.inTransactionReturns( tx -> outbox.unblock(orderedEntryListener.getBlocked().getId()))); - assertTrue(successLatch.await(10, SECONDS)); - var orderedEntryEvents = orderedEntryListener.getOrderedEntries(); - log.info("The entry life cycle is: {}", orderedEntryEvents); + assertTrue(successLatch.await(20, SECONDS), "Timeout waiting for success"); + var events = orderedEntryListener.getEvents(); + log.info("The entry life cycle is: {}", events); // then we are only dealing in terms of a single outbox entry. - assertEquals( - 1, orderedEntryEvents.stream().map(TransactionOutboxEntry::getId).distinct().count()); + assertEquals(1, events.stream().map(TransactionOutboxEntry::getId).distinct().count()); // the first, scheduled entry has no lastAttemptTime set - assertNull(orderedEntryEvents.get(0).getLastAttemptTime()); + assertNull(events.get(0).getLastAttemptTime()); // all subsequent entries (2 x failures (second of which 'blocks'), 1x success updates // against db) have a distinct lastAttemptTime set on them. assertEquals( 3, - orderedEntryEvents.stream() + events.stream() .skip(1) .map(TransactionOutboxEntry::getLastAttemptTime) .distinct() .count()); - }); + }, + singleThreadPool); } /** @@ -492,6 +567,7 @@ final void blockAndThenUnblockForRetry() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(latchListener) .blockAfterAttempts(2) @@ -504,13 +580,14 @@ final void blockAndThenUnblockForRetry() throws Exception { () -> { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); - assertTrue(blockLatch.await(3, SECONDS)); + assertTrue(blockLatch.await(5, SECONDS)); assertTrue( (Boolean) transactionManager.inTransactionReturns( tx -> outbox.unblock(latchListener.getBlocked().getId()))); - assertTrue(successLatch.await(3, SECONDS)); - }); + assertTrue(successLatch.await(5, SECONDS)); + }, + singleThreadPool); } /** Hammers high-volume, frequently failing tasks to ensure that they all get run. */ diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java index 7cf9ed2f..617c14a3 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java @@ -4,17 +4,23 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.SQLException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import lombok.Builder; import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @Slf4j public abstract class BaseTest { protected HikariDataSource dataSource; + private ExecutorService flushExecutor; @BeforeEach final void baseBeforeEach() { @@ -24,10 +30,13 @@ final void baseBeforeEach() { config.setPassword(connectionDetails().password()); config.addDataSourceProperty("cachePrepStmts", "true"); dataSource = new HikariDataSource(config); + flushExecutor = Executors.newFixedThreadPool(4); } @AfterEach - final void baseAfterEach() { + final void baseAfterEach() throws InterruptedException { + flushExecutor.shutdown(); + Assertions.assertTrue(flushExecutor.awaitTermination(30, TimeUnit.SECONDS)); dataSource.close(); } @@ -65,6 +74,11 @@ protected void clearOutbox() { protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable runnable) throws Exception { + withRunningFlusher(outbox, runnable, flushExecutor); + } + + protected void withRunningFlusher( + TransactionOutbox outbox, ThrowingRunnable runnable, Executor executor) throws Exception { Thread backgroundThread = new Thread( () -> { @@ -72,9 +86,9 @@ protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable run try { // Keep flushing work until there's nothing left to flush //noinspection StatementWithEmptyBody - while (outbox.flush()) {} + while (outbox.flush(executor)) {} } catch (Exception e) { - log.error("Error flushing transaction outbox. Pausing", e); + log.error("Error flushing transaction outbox", e); } try { //noinspection BusyWait diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java index e91fcf56..db6245bc 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java @@ -6,39 +6,46 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * Collects an ordered list of all entry events (*excluding blocked events) that have hit this * listener until a specified number of blocks / successes have occurred. */ +@Slf4j public final class OrderedEntryListener implements TransactionOutboxListener { private final CountDownLatch successLatch; private final CountDownLatch blockedLatch; @Getter private volatile TransactionOutboxEntry blocked; - private final CopyOnWriteArrayList orderedEntries; + private final CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList successes = + new CopyOnWriteArrayList<>(); OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) { this.successLatch = successLatch; this.blockedLatch = blockedLatch; - orderedEntries = new CopyOnWriteArrayList<>(); } @Override public void scheduled(TransactionOutboxEntry entry) { - orderedEntries.add(from(entry)); + events.add(from(entry)); } @Override public void success(TransactionOutboxEntry entry) { - orderedEntries.add(from(entry)); + var copy = from(entry); + events.add(copy); + successes.add(copy); + log.info( + "Received success #{}. Counting down at {}", successes.size(), successLatch.getCount()); successLatch.countDown(); } @Override public void failure(TransactionOutboxEntry entry, Throwable cause) { - orderedEntries.add(from(entry)); + events.add(from(entry)); } @Override @@ -50,28 +57,21 @@ public void blocked(TransactionOutboxEntry entry, Throwable cause) { } /** - * Retrieve an unmodifiable copy of {@link #orderedEntries}. Beware, expectation is that this does - * not/ should not get accessed until the correct number of {@link - * #success(TransactionOutboxEntry)} and {@link #blocked(TransactionOutboxEntry, Throwable)}} - * counts have occurred. + * Retrieve an unmodifiable copy of {@link #events}. Beware, expectation is that this does not/ + * should not get accessed until the correct number of {@link #success(TransactionOutboxEntry)} + * and {@link #blocked(TransactionOutboxEntry, Throwable)}} counts have occurred. * * @return unmodifiable list of ordered outbox entry events. */ - public List getOrderedEntries() { - return List.copyOf(orderedEntries); + public List getEvents() { + return List.copyOf(events); + } + + public List getSuccesses() { + return List.copyOf(successes); } private TransactionOutboxEntry from(TransactionOutboxEntry entry) { - return TransactionOutboxEntry.builder() - .id(entry.getId()) - .uniqueRequestId(entry.getUniqueRequestId()) - .invocation(entry.getInvocation()) - .lastAttemptTime(entry.getLastAttemptTime()) - .nextAttemptTime(entry.getNextAttemptTime()) - .attempts(entry.getAttempts()) - .blocked(entry.isBlocked()) - .processed(entry.isProcessed()) - .version(entry.getVersion()) - .build(); + return entry.toBuilder().build(); } }