From 10d67089e0d00cb46d21a84498c389f52d1b1952 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 16 Mar 2024 00:49:01 +0000 Subject: [PATCH 01/11] Record partition and sequence on records --- .../transactionoutbox/DefaultDialect.java | 23 +++++ .../transactionoutbox/DefaultPersistor.java | 88 +++++++++++++++---- .../transactionoutbox/TransactionOutbox.java | 2 + .../TransactionOutboxEntry.java | 21 +++-- .../TransactionOutboxImpl.java | 24 +++-- .../QuarkusTransactionManager.java | 52 ++++++----- .../quarkus/QuarkusTransactionManager.java | 52 ++++++----- .../SpringTransactionManager.java | 50 +++++++---- .../spring/SpringTransactionManager.java | 50 +++++++---- .../testing/AbstractAcceptanceTest.java | 27 ++++++ 10 files changed, 283 insertions(+), 106 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 53d512bb..68f48fb8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -120,6 +120,29 @@ static final class Builder { 8, "Update length of invocation column on outbox for MySQL dialects only.", "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT")); + migrations.put( + 9, + new Migration( + 9, + "Add partition", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN partition VARCHAR(255) NULL")); + migrations.put( + 10, + new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); + migrations.put( + 11, + new Migration( + 10, + "Add sequence table", + "CREATE TABLE TXNO_SEQUENCE (partition VARCHAR(255) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (partition, seq))")); + migrations.put( + 12, new Migration(12, "Drop index", "DROP INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX")); + migrations.put( + 13, + new Migration( + 13, + "Modify flush index to support ordering", + "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime, partition, seq)")); } Builder setMigration(Migration migration) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 5a7c37ed..8a5f78f3 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -37,7 +37,7 @@ public class DefaultPersistor implements Persistor, Validatable { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, partition, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -110,26 +110,29 @@ public void writeSchema(Writer writer) { public void save(Transaction tx, TransactionOutboxEntry entry) throws SQLException, AlreadyScheduledException { var insertSql = - "INSERT INTO " + tableName + " (" + ALL_FIELDS + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "INSERT INTO " + + tableName + + " (" + + ALL_FIELDS + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; var writer = new StringWriter(); serializer.serializeInvocation(entry.getInvocation(), writer); + if (entry.getPartition() != null) { + setNextSequence(tx, entry); + log.info( + "Assigned sequence number {} to partition {}", entry.getSequence(), entry.getPartition()); + } + PreparedStatement stmt = tx.prepareBatchStatement(insertSql); + setupInsert(entry, writer, stmt); if (entry.getUniqueRequestId() == null) { - PreparedStatement stmt = tx.prepareBatchStatement(insertSql); - setupInsert(entry, writer, stmt); stmt.addBatch(); log.debug("Inserted {} in batch", entry.description()); } else { - //noinspection resource - try (PreparedStatement stmt = tx.connection().prepareStatement(insertSql)) { - setupInsert(entry, writer, stmt); + try { stmt.executeUpdate(); log.debug("Inserted {} immediately", entry.description()); - } catch (SQLIntegrityConstraintViolationException e) { - throw new AlreadyScheduledException( - "Request " + entry.description() + " already exists", e); } catch (Exception e) { - if (e.getClass().getName().equals("org.postgresql.util.PSQLException") - && e.getMessage().contains("constraint")) { + if (indexViolation(e)) { throw new AlreadyScheduledException( "Request " + entry.description() + " already exists", e); } @@ -138,19 +141,61 @@ public void save(Transaction tx, TransactionOutboxEntry entry) } } + private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throws SQLException { + //noinspection resource + var seqSelect = + tx.prepareBatchStatement("SELECT seq FROM TXNO_SEQUENCE WHERE partition = ? FOR UPDATE"); + seqSelect.setString(1, entry.getPartition()); + try (ResultSet rs = seqSelect.executeQuery()) { + if (rs.next()) { + entry.setSequence(rs.getLong(1) + 1L); + //noinspection resource + var seqUpdate = + tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE partition = ?"); + seqUpdate.setLong(1, entry.getSequence()); + seqUpdate.setString(2, entry.getPartition()); + seqUpdate.executeUpdate(); + } else { + try { + entry.setSequence(1L); + //noinspection resource + var seqInsert = + tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (partition, seq) VALUES (?, ?)"); + seqInsert.setString(1, entry.getPartition()); + seqInsert.setLong(2, entry.getSequence()); + seqInsert.executeUpdate(); + } catch (Exception e) { + if (indexViolation(e)) { + setNextSequence(tx, entry); + } else { + throw e; + } + } + } + } + } + + private boolean indexViolation(Exception e) { + return (e instanceof SQLIntegrityConstraintViolationException) + || (e.getClass().getName().equals("org.postgresql.util.PSQLException") + && e.getMessage().contains("constraint")); + } + private void setupInsert( TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt) throws SQLException { stmt.setString(1, entry.getId()); stmt.setString(2, entry.getUniqueRequestId()); stmt.setString(3, writer.toString()); + stmt.setString(4, entry.getPartition() == null ? "" : entry.getPartition()); + stmt.setLong(5, entry.getSequence()); stmt.setTimestamp( - 4, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); - stmt.setTimestamp(5, Timestamp.from(entry.getNextAttemptTime())); - stmt.setInt(6, entry.getAttempts()); - stmt.setBoolean(7, entry.isBlocked()); - stmt.setBoolean(8, entry.isProcessed()); - stmt.setInt(9, entry.getVersion()); + 6, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); + stmt.setTimestamp(7, Timestamp.from(entry.getNextAttemptTime())); + stmt.setInt(8, entry.getAttempts()); + stmt.setBoolean(9, entry.isBlocked()); + stmt.setBoolean(10, entry.isProcessed()); + stmt.setInt(11, entry.getVersion()); } @Override @@ -308,12 +353,19 @@ private List gatherResults(int batchSize, PreparedStatem } private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOException { + String partition = rs.getString("partition"); + Long sequence = rs.getLong("seq"); + if (rs.wasNull()) { + sequence = null; + } try (Reader invocationStream = rs.getCharacterStream("invocation")) { TransactionOutboxEntry entry = TransactionOutboxEntry.builder() .id(rs.getString("id")) .uniqueRequestId(rs.getString("uniqueRequestId")) .invocation(serializer.deserializeInvocation(invocationStream)) + .partition("".equals(partition) ? null : partition) + .sequence(sequence) .lastAttemptTime( rs.getTimestamp("lastAttemptTime") == null ? null diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index f240a2e7..d188c77b 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -312,6 +312,8 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); + ParameterizedScheduleBuilder ordered(String partition); + /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters * to the request as configured using {@link TransactionOutbox#with()}. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index f86e4248..19f8cc0c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -4,11 +4,7 @@ import java.time.Instant; import java.util.Arrays; -import lombok.AccessLevel; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; /** @@ -36,6 +32,21 @@ public class TransactionOutboxEntry implements Validatable { @Getter private final String uniqueRequestId; + /** + * @param partition An optional scope for ordered sequencing. + */ + @SuppressWarnings("JavaDoc") + @Getter + private final String partition; + + /** + * @param sequence The ordered sequence within the {@code partition}. + */ + @SuppressWarnings("JavaDoc") + @Getter + @Setter + private Long sequence; + /** * @param invocation The method invocation to perform. * @return The method invocation to perform. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 022f5145..7cf255d3 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -108,7 +108,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null); + return schedule(clazz, null, null); } @Override @@ -222,7 +222,7 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule(Class clazz, String uniqueRequestId) { + private T schedule(Class clazz, String uniqueRequestId, String orderPartition) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -238,7 +238,8 @@ private T schedule(Class clazz, String uniqueRequestId) { extracted.getMethodName(), extracted.getParameters(), extracted.getArgs(), - uniqueRequestId); + uniqueRequestId, + orderPartition); validator.validate(entry); persistor.save(extracted.getTransaction(), entry); extracted @@ -313,7 +314,12 @@ private void invoke(TransactionOutboxEntry entry, Transaction transaction) } private TransactionOutboxEntry newEntry( - Class clazz, String methodName, Class[] params, Object[] args, String uniqueRequestId) { + Class clazz, + String methodName, + Class[] params, + Object[] args, + String uniqueRequestId, + String partition) { return TransactionOutboxEntry.builder() .id(UUID.randomUUID().toString()) .invocation( @@ -326,6 +332,7 @@ private TransactionOutboxEntry newEntry( .lastAttemptTime(null) .nextAttemptTime(after(attemptFrequency)) .uniqueRequestId(uniqueRequestId) + .partition(partition) .build(); } @@ -410,6 +417,7 @@ public TransactionOutboxImpl build() { private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder { private String uniqueRequestId; + private String orderPartition; @Override public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { @@ -417,12 +425,18 @@ public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { return this; } + @Override + public ParameterizedScheduleBuilder ordered(String partition) { + this.orderPartition = partition; + return this; + } + @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, orderPartition); } } } diff --git a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java index f79356b1..2fbbd53f 100644 --- a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java +++ b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java @@ -15,6 +15,8 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import javax.sql.DataSource; /** @@ -67,6 +69,8 @@ public T requireTransactionReturns( private final class CdiTransaction implements Transaction { + private final Map preparedStatements = new HashMap<>(); + public Connection connection() { try { return datasource.getConnection(); @@ -77,26 +81,34 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); - - tsr.registerInterposedSynchronization( - new Synchronization() { - @Override - public void beforeCompletion() { - if (preparedStatement.getBatchCount() != 0) { - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - - return preparedStatement; + return preparedStatements.computeIfAbsent( + sql, + s -> + Utils.uncheckedly( + () -> { + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> + BatchCountingStatementHandler.countBatches( + connection().prepareStatement(sql))); + + tsr.registerInterposedSynchronization( + new Synchronization() { + @Override + public void beforeCompletion() { + if (preparedStatement.getBatchCount() != 0) { + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + + return preparedStatement; + })); } @Override diff --git a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java index f93a2bee..e3cec0e6 100644 --- a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java +++ b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java @@ -17,6 +17,8 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import javax.sql.DataSource; /** Transaction manager which uses cdi and quarkus. */ @@ -66,6 +68,8 @@ public T requireTransactionReturns( private final class CdiTransaction implements Transaction { + private final Map preparedStatements = new HashMap<>(); + public Connection connection() { try { return datasource.getConnection(); @@ -76,26 +80,34 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); - - tsr.registerInterposedSynchronization( - new Synchronization() { - @Override - public void beforeCompletion() { - if (preparedStatement.getBatchCount() != 0) { - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - - return preparedStatement; + return preparedStatements.computeIfAbsent( + sql, + s -> + Utils.uncheckedly( + () -> { + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> + BatchCountingStatementHandler.countBatches( + connection().prepareStatement(sql))); + + tsr.registerInterposedSynchronization( + new Synchronization() { + @Override + public void beforeCompletion() { + if (preparedStatement.getBatchCount() != 0) { + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + + return preparedStatement; + })); } @Override diff --git a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java index 9685db7d..14bd3a08 100644 --- a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java +++ b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java @@ -9,6 +9,8 @@ import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.PreparedStatement; +import java.util.HashMap; +import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -81,6 +83,8 @@ public T requireTransactionReturns( private final class SpringTransaction implements Transaction { + private final Map preparedStatements = new HashMap<>(); + @Override public Connection connection() { return DataSourceUtils.getConnection(dataSource); @@ -88,25 +92,33 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); - TransactionSynchronizationManager.registerSynchronization( - new TransactionSynchronization() { - @Override - public void beforeCommit(boolean readOnly) { - if (preparedStatement.getBatchCount() != 0) { - log.debug("Flushing batches"); - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - return preparedStatement; + return preparedStatements.computeIfAbsent( + sql, + s -> + Utils.uncheckedly( + () -> { + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> + BatchCountingStatementHandler.countBatches( + connection().prepareStatement(sql))); + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + @Override + public void beforeCommit(boolean readOnly) { + if (preparedStatement.getBatchCount() != 0) { + log.debug("Flushing batches"); + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + return preparedStatement; + })); } @Override diff --git a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java index 51a0023f..738aad6a 100644 --- a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java +++ b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java @@ -10,6 +10,8 @@ import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.PreparedStatement; +import java.util.HashMap; +import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -79,6 +81,8 @@ public T requireTransactionReturns( private final class SpringTransaction implements Transaction { + private final Map preparedStatements = new HashMap<>(); + @Override public Connection connection() { return DataSourceUtils.getConnection(dataSource); @@ -86,25 +90,33 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); - TransactionSynchronizationManager.registerSynchronization( - new TransactionSynchronization() { - @Override - public void beforeCommit(boolean readOnly) { - if (preparedStatement.getBatchCount() != 0) { - log.debug("Flushing batches"); - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - return preparedStatement; + return preparedStatements.computeIfAbsent( + sql, + s -> + Utils.uncheckedly( + () -> { + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> + BatchCountingStatementHandler.countBatches( + connection().prepareStatement(sql))); + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + @Override + public void beforeCommit(boolean readOnly) { + if (preparedStatement.getBatchCount() != 0) { + log.debug("Flushing batches"); + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + return preparedStatement; + })); } @Override diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 2e48f70f..552d78d6 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -40,6 +40,33 @@ public abstract class AbstractAcceptanceTest extends BaseTest { private static final Random random = new Random(); + @Test + final void sequencing() { + TransactionManager transactionManager = txManager(); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .instantiator( + Instantiator.using( + clazz -> + (InterfaceProcessor) + (foo, bar) -> LOGGER.info("Processing ({}, {})", foo, bar))) + .persistor(persistor()) + .initializeImmediately(false) + .build(); + + outbox.initialize(); + clearOutbox(); + + transactionManager.inTransaction( + () -> + outbox + .with() + .ordered("my-partition") + .schedule(InterfaceProcessor.class) + .process(3, "Whee")); + } + /** * Uses a simple direct transaction manager and connection manager and attempts to fire an * interface using a custom instantiator. From 707960398d44177a9a23201f4661f036b947d4c7 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 16 Mar 2024 16:20:16 +0000 Subject: [PATCH 02/11] WIP --- .../transactionoutbox/DefaultDialect.java | 12 +-- .../transactionoutbox/DefaultPersistor.java | 87 ++++++++++++++----- .../gruelbox/transactionoutbox/Dialect.java | 11 ++- .../gruelbox/transactionoutbox/Persistor.java | 3 + .../transactionoutbox/TransactionOutbox.java | 52 ++++++++++- .../TransactionOutboxEntry.java | 12 +-- .../TransactionOutboxImpl.java | 57 +++++++++--- .../testing/AbstractAcceptanceTest.java | 2 +- 8 files changed, 191 insertions(+), 45 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 68f48fb8..ef1ef726 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -23,6 +23,7 @@ static Builder builder(String name) { @Getter private final String name; @Getter private final boolean supportsSkipLock; + @Getter private final boolean supportsWindowFunctions; @Getter private final String deleteExpired; @Getter private final String limitCriteria; @Getter private final String checkSql; @@ -56,6 +57,7 @@ public Stream getMigrations() { static final class Builder { private final String name; private boolean supportsSkipLock = false; + private boolean supportsWindowFunctions = false; private String deleteExpired = "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; private String limitCriteria = " LIMIT ?"; @@ -124,8 +126,8 @@ static final class Builder { 9, new Migration( 9, - "Add partition", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN partition VARCHAR(255) NULL")); + "Add topic", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NULL")); migrations.put( 10, new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); @@ -134,7 +136,7 @@ static final class Builder { new Migration( 10, "Add sequence table", - "CREATE TABLE TXNO_SEQUENCE (partition VARCHAR(255) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (partition, seq))")); + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); migrations.put( 12, new Migration(12, "Drop index", "DROP INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX")); migrations.put( @@ -142,7 +144,7 @@ static final class Builder { new Migration( 13, "Modify flush index to support ordering", - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime, partition, seq)")); + "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime, topic, seq)")); } Builder setMigration(Migration migration) { @@ -160,7 +162,7 @@ Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { Dialect build() { return new DefaultDialect( - name, supportsSkipLock, deleteExpired, limitCriteria, checkSql, migrations.values()) { + name, supportsSkipLock, supportsWindowFunctions, deleteExpired, limitCriteria, checkSql, migrations.values()) { @Override public String booleanValue(boolean criteriaValue) { if (booleanValueFrom != null) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 8a5f78f3..f3258138 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -12,8 +12,8 @@ import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; +import java.util.*; + import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -37,7 +37,7 @@ public class DefaultPersistor implements Persistor, Validatable { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, partition, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -117,10 +117,10 @@ public void save(Transaction tx, TransactionOutboxEntry entry) + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; var writer = new StringWriter(); serializer.serializeInvocation(entry.getInvocation(), writer); - if (entry.getPartition() != null) { + if (entry.getTopic() != null) { setNextSequence(tx, entry); log.info( - "Assigned sequence number {} to partition {}", entry.getSequence(), entry.getPartition()); + "Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); } PreparedStatement stmt = tx.prepareBatchStatement(insertSql); setupInsert(entry, writer, stmt); @@ -144,24 +144,24 @@ public void save(Transaction tx, TransactionOutboxEntry entry) private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throws SQLException { //noinspection resource var seqSelect = - tx.prepareBatchStatement("SELECT seq FROM TXNO_SEQUENCE WHERE partition = ? FOR UPDATE"); - seqSelect.setString(1, entry.getPartition()); + tx.prepareBatchStatement("SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE"); + seqSelect.setString(1, entry.getTopic()); try (ResultSet rs = seqSelect.executeQuery()) { if (rs.next()) { entry.setSequence(rs.getLong(1) + 1L); //noinspection resource var seqUpdate = - tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE partition = ?"); + tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?"); seqUpdate.setLong(1, entry.getSequence()); - seqUpdate.setString(2, entry.getPartition()); + seqUpdate.setString(2, entry.getTopic()); seqUpdate.executeUpdate(); } else { try { entry.setSequence(1L); //noinspection resource var seqInsert = - tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (partition, seq) VALUES (?, ?)"); - seqInsert.setString(1, entry.getPartition()); + tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)"); + seqInsert.setString(1, entry.getTopic()); seqInsert.setLong(2, entry.getSequence()); seqInsert.executeUpdate(); } catch (Exception e) { @@ -187,7 +187,7 @@ private void setupInsert( stmt.setString(1, entry.getId()); stmt.setString(2, entry.getUniqueRequestId()); stmt.setString(3, writer.toString()); - stmt.setString(4, entry.getPartition() == null ? "" : entry.getPartition()); + stmt.setString(4, entry.getTopic() == null ? "" : entry.getTopic()); stmt.setLong(5, entry.getSequence()); stmt.setTimestamp( 6, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); @@ -319,14 +319,63 @@ public List selectBatch(Transaction tx, int batchSize, I + dialect.booleanValue(false) + " AND processed = " + dialect.booleanValue(false) + + " AND topic = ''" + dialect.getLimitCriteria() + forUpdate)) { stmt.setTimestamp(1, Timestamp.from(now)); stmt.setInt(2, batchSize); - return gatherResults(batchSize, stmt); + var result = new ArrayList(batchSize); + gatherResults(batchSize, stmt, result); + return result; + } + } + + @Override + public Set selectBatchOrdered(final Transaction tx, final int batchSize, final Instant now) throws Exception { + try (PreparedStatement stmt = + tx.connection() + .prepareStatement( + dialect.isSupportsWindowFunctions() + ? batchOrderedSqlWindowFunctions() + : batchOrderedSqlNonWindowFunctions())) { + stmt.setTimestamp(1, Timestamp.from(now)); + stmt.setInt(2, batchSize); + var result = new HashSet(); + gatherResults(batchSize, stmt, result); + return result; } } + private String batchOrderedSqlWindowFunctions() { + var subquery = String.format( + "SELECT ROW_NUMBER() OVER (PARTITION BY topic ORDER BY seq) AS rn, %s FROM %s WHERE topic <> '' AND processed = %s", + ALL_FIELDS, + tableName, + dialect.booleanValue(false)); + return String.format( + "SELECT %s FROM (%s) t WHERE rn = 1 AND nextAttemptTime < ? AND topic <> '' and processed = %s %s", + ALL_FIELDS, + subquery, + dialect.booleanValue(false) + dialect.getLimitCriteria()); + } + + private String batchOrderedSqlNonWindowFunctions() { + var subquery = String.format( + "SELECT %s FROM (SELECT topic, MIN(seq) AS min_seq FROM %s WHERE topic <> '' AND processed = %s", + ALL_FIELDS, + tableName, + dialect.booleanValue(false)); + return String.format( + "SELECT %s FROM %s m JOIN (%s) t ON m.topic = t.topic AND m.seq = t.min_seq WHERE nextAttemptTime < ? AND topic <> '' and processed = %s %s", + ALL_FIELDS, + tableName, + subquery, + dialect.booleanValue(false), + dialect.getLimitCriteria()); + } + + @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) throws Exception { @@ -340,20 +389,18 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) } } - private List gatherResults(int batchSize, PreparedStatement stmt) + private void gatherResults(int batchSize, PreparedStatement stmt, Collection output) throws SQLException, IOException { try (ResultSet rs = stmt.executeQuery()) { - ArrayList result = new ArrayList<>(batchSize); while (rs.next()) { - result.add(map(rs)); + output.add(map(rs)); } - log.debug("Found {} results", result.size()); - return result; + log.debug("Found {} results", output.size()); } } private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOException { - String partition = rs.getString("partition"); + String topic = rs.getString("topic"); Long sequence = rs.getLong("seq"); if (rs.wasNull()) { sequence = null; @@ -364,7 +411,7 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio .id(rs.getString("id")) .uniqueRequestId(rs.getString("uniqueRequestId")) .invocation(serializer.deserializeInvocation(invocationStream)) - .partition("".equals(partition) ? null : partition) + .topic("".equals(topic) ? null : topic) .sequence(sequence) .lastAttemptTime( rs.getTimestamp("lastAttemptTime") == null diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 7f54401f..15874f00 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -15,6 +15,12 @@ public interface Dialect { */ boolean isSupportsSkipLock(); + /** + * @return True if window functions are supported. This improves performance when using ordered + * processing. + */ + boolean isSupportsWindowFunctions(); + /** * @return Format string for the SQL required to delete expired retained records. */ @@ -31,10 +37,11 @@ public interface Dialect { Stream getMigrations(); Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); - Dialect MY_SQL_8 = DefaultDialect.builder("MY_SQL_8").supportsSkipLock(true).build(); + Dialect MY_SQL_8 = DefaultDialect.builder("MY_SQL_8").supportsSkipLock(true).supportsWindowFunctions(true).build(); Dialect POSTGRESQL_9 = DefaultDialect.builder("POSTGRESQL_9") .supportsSkipLock(true) + .supportsWindowFunctions(true) .deleteExpired( "DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)") .changeMigration( @@ -46,12 +53,14 @@ public interface Dialect { Dialect H2 = DefaultDialect.builder("H2") + .supportsWindowFunctions(true) .changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)") .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") .disableMigration(8) .build(); Dialect ORACLE = DefaultDialect.builder("ORACLE") + .supportsWindowFunctions(true) .supportsSkipLock(true) .deleteExpired( "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?") diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java index 8b03f157..6b3df028 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java @@ -2,6 +2,7 @@ import java.time.Instant; import java.util.List; +import java.util.Set; /** * Saves and loads {@link TransactionOutboxEntry}s. For most use cases, just use {@link @@ -105,6 +106,8 @@ static DefaultPersistor forDialect(Dialect dialect) { List selectBatch(Transaction tx, int batchSize, Instant now) throws Exception; + Set selectBatchOrdered(final Transaction tx, final int batchSize, final Instant now) throws Exception; + /** * Deletes records which have processed and passed their expiry time, in specified batch sizes. * diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index d188c77b..0ea6f2f4 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -90,6 +90,13 @@ static TransactionOutboxBuilder builder() { @SuppressWarnings("UnusedReturnValue") boolean flush(); + /** + * TODO + * @param executor + * @return + */ + boolean flushOrdered(Executor executor); + /** * Unblocks a blocked entry and resets the attempt count so that it will be retried again. * Requires an active transaction and a transaction manager that supports thread local context. @@ -312,7 +319,50 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); - ParameterizedScheduleBuilder ordered(String partition); + /** + * Specifies that the request should be applied in a strictly-ordered fashion within the + * specified topic. + * + *

This is useful for a number of applications, such as feeding messages into + * an ordered pipeline such as a FIFO queue or Kafka topic, or for reliable data replication, + * such as when feeding a data warehouse or distributed cache.

+ * + *

Note that using this option has a number of consequences:

+ * + *
    + *
  • Requests are not processed immediately when submitting a request, as normal, and + * are not processed by {@link TransactionOutbox#flush()} either. Requests are processed + * up only when calling {@link TransactionOutbox#flushOrdered(Executor)}. As a result + * there will be increased delay between the source transaction being committed and the request + * being processed.
  • + *
  • If a request fails, no further requests will be processed in that topic until a + * subsequent retry allows the failing request to succeed, to preserve ordered processing. + * This means it is possible for topics to become entirely frozen in the event that a + * request fails repeatedly. For this reason, it is essential to use a + * {@link TransactionOutboxListener} to watch for failing requests and investigate + * quickly. Note that other topics will be unaffected.
  • + *
  • For the same reason, {@link TransactionOutboxBuilder#blockAfterAttempts} is + * ignored for all requests that use this option. The only safe way to recover from + * a failing request is to make the request succeed.
  • + *
  • A single topic can only be processed in single-threaded fashion, so if your requests + * use a small number of topics, scalability will be affected since the degree of + * parallelism will be reduced.
  • + *
  • Throughput is significantly reduced and database load increased more generally, even + * with larger numbers of topics, since records are only processed one-at-a-time rather + * than in batches, which is less optimised.
  • + *
  • In general, + * + * databases are not well optimised for this sort of thing. Don't expect miracles. If + * you need more throughput, you probably need to think twice about your architecture. + * Consider the event + * sourcing pattern, for example, where the message queue is the primary data + * store rather than a secondary, and remove the need for an outbox entirely.
  • + *
+ * + * @param topic a free-text string up to 250 characters. + * @return Builder. + */ + ParameterizedScheduleBuilder ordered(String topic); /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 19f8cc0c..ab0a6cfc 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -33,14 +33,14 @@ public class TransactionOutboxEntry implements Validatable { private final String uniqueRequestId; /** - * @param partition An optional scope for ordered sequencing. + * @param topic An optional scope for ordered sequencing. */ @SuppressWarnings("JavaDoc") @Getter - private final String partition; + private final String topic; /** - * @param sequence The ordered sequence within the {@code partition}. + * @param sequence The ordered sequence within the {@code topic}. */ @SuppressWarnings("JavaDoc") @Getter @@ -124,7 +124,7 @@ public String description() { if (!this.initialized) { String description = String.format( - "%s.%s(%s) [%s]%s", + "%s.%s(%s) [%s]%s%s", invocation.getClassName(), invocation.getMethodName(), invocation.getArgs() == null @@ -133,7 +133,8 @@ public String description() { .map(this::stringify) .collect(joining(", ")), id, - uniqueRequestId == null ? "" : " uid=[" + uniqueRequestId + "]"); + uniqueRequestId == null ? "" : " uid=[" + uniqueRequestId + "]", + topic == null ? "" : " seq=[" + topic + "/" + sequence + "]"); this.description = description; this.initialized = true; return description; @@ -160,6 +161,7 @@ private String stringify(Object o) { public void validate(Validator validator) { validator.notNull("id", id); validator.nullOrNotBlank("uniqueRequestId", uniqueRequestId); + validator.nullOrNotBlank("topic", topic); validator.notNull("invocation", invocation); validator.inFuture("nextAttemptTime", nextAttemptTime); validator.positiveOrZero("attempts", attempts); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 7cf255d3..46b42bf4 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -11,9 +11,9 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import lombok.ToString; @@ -153,6 +153,37 @@ private List flush(Instant now) { return batch; } + @Override + public boolean flushOrdered(Executor executor) { + return transactionManager.inTransactionReturns( + tx -> { + Set failingTopics = new HashSet<>(); + Set lastSetOfResults = new HashSet<>(); + int count = 0; + do { + var nextSetOfResults = uncheckedly(() -> persistor.selectBatchOrdered(tx, flushBatchSize, clockProvider.get().instant())); + var batch = new ArrayList>(); + for (var request : nextSetOfResults) { + if (failingTopics.contains(request.getTopic())) { + continue; + } + if (lastSetOfResults.contains(request)) { + failingTopics.add(request.getTopic()); + } else { + count++; + batch.add(CompletableFuture.runAsync(() -> processNow(request), executor)); + } + } + lastSetOfResults = nextSetOfResults; + if (batch.isEmpty()) { + break; + } + CompletableFuture.allOf(batch.toArray(new CompletableFuture[0])).join(); + } while (count < flushBatchSize); + return count > 0; + }); + } + private void expireIdempotencyProtection(Instant now) { long totalRecordsDeleted = 0; int recordsDeleted; @@ -222,7 +253,7 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule(Class clazz, String uniqueRequestId, String orderPartition) { + private T schedule(Class clazz, String uniqueRequestId, String topic) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -239,7 +270,7 @@ private T schedule(Class clazz, String uniqueRequestId, String orderParti extracted.getParameters(), extracted.getArgs(), uniqueRequestId, - orderPartition); + topic); validator.validate(entry); persistor.save(extracted.getTransaction(), entry); extracted @@ -247,7 +278,9 @@ private T schedule(Class clazz, String uniqueRequestId, String orderParti .addPostCommitHook( () -> { listener.scheduled(entry); - submitNow(entry); + if (entry.getTopic() == null) { + submitNow(entry); + } }); log.debug( "Scheduled {} for running after transaction commit", entry.description()); @@ -319,7 +352,7 @@ private TransactionOutboxEntry newEntry( Class[] params, Object[] args, String uniqueRequestId, - String partition) { + String topic) { return TransactionOutboxEntry.builder() .id(UUID.randomUUID().toString()) .invocation( @@ -332,7 +365,7 @@ private TransactionOutboxEntry newEntry( .lastAttemptTime(null) .nextAttemptTime(after(attemptFrequency)) .uniqueRequestId(uniqueRequestId) - .partition(partition) + .topic(topic) .build(); } @@ -417,7 +450,7 @@ public TransactionOutboxImpl build() { private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder { private String uniqueRequestId; - private String orderPartition; + private String topic; @Override public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { @@ -426,8 +459,8 @@ public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { } @Override - public ParameterizedScheduleBuilder ordered(String partition) { - this.orderPartition = partition; + public ParameterizedScheduleBuilder ordered(String topic) { + this.topic = topic; return this; } @@ -436,7 +469,7 @@ public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, orderPartition); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, topic); } } } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 552d78d6..538ad5ff 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -62,7 +62,7 @@ final void sequencing() { () -> outbox .with() - .ordered("my-partition") + .ordered("my-topic") .schedule(InterfaceProcessor.class) .process(3, "Whee")); } From dbcfd484a5f612bcf8b42b27eb2ee01ac4b01f86 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 16 Mar 2024 22:47:23 +0000 Subject: [PATCH 03/11] Nailed the design I think --- .../transactionoutbox/DefaultDialect.java | 23 +++--- .../transactionoutbox/DefaultPersistor.java | 73 ++++++++----------- .../gruelbox/transactionoutbox/Dialect.java | 12 ++- .../gruelbox/transactionoutbox/Persistor.java | 6 +- .../transactionoutbox/StubPersistor.java | 12 +++ .../transactionoutbox/TransactionOutbox.java | 59 +++++++-------- .../TransactionOutboxEntry.java | 1 - .../TransactionOutboxImpl.java | 49 +++++++------ .../gruelbox/transactionoutbox/Validator.java | 8 -- .../testing/AbstractAcceptanceTest.java | 22 ++++-- .../transactionoutbox/testing/BaseTest.java | 5 +- 11 files changed, 135 insertions(+), 135 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index ef1ef726..4d448547 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -23,7 +23,6 @@ static Builder builder(String name) { @Getter private final String name; @Getter private final boolean supportsSkipLock; - @Getter private final boolean supportsWindowFunctions; @Getter private final String deleteExpired; @Getter private final String limitCriteria; @Getter private final String checkSql; @@ -57,7 +56,6 @@ public Stream getMigrations() { static final class Builder { private final String name; private boolean supportsSkipLock = false; - private boolean supportsWindowFunctions = false; private String deleteExpired = "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?"; private String limitCriteria = " LIMIT ?"; @@ -125,9 +123,7 @@ static final class Builder { migrations.put( 9, new Migration( - 9, - "Add topic", - "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NULL")); + 9, "Add topic", "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NULL")); migrations.put( 10, new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); @@ -138,13 +134,11 @@ static final class Builder { "Add sequence table", "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); migrations.put( - 12, new Migration(12, "Drop index", "DROP INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX")); - migrations.put( - 13, + 12, new Migration( - 13, - "Modify flush index to support ordering", - "CREATE INDEX IX_TXNO_OUTBOX_1 ON TXNO_OUTBOX (processed, blocked, nextAttemptTime, topic, seq)")); + 12, + "Add flush index to support ordering", + "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); } Builder setMigration(Migration migration) { @@ -162,7 +156,12 @@ Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { Dialect build() { return new DefaultDialect( - name, supportsSkipLock, supportsWindowFunctions, deleteExpired, limitCriteria, checkSql, migrations.values()) { + name, + supportsSkipLock, + deleteExpired, + limitCriteria, + checkSql, + migrations.values()) { @Override public String booleanValue(boolean criteriaValue) { if (booleanValueFrom != null) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index f3258138..3cf6669d 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -13,7 +13,6 @@ import java.sql.Timestamp; import java.time.Instant; import java.util.*; - import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -119,8 +118,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry) serializer.serializeInvocation(entry.getInvocation(), writer); if (entry.getTopic() != null) { setNextSequence(tx, entry); - log.info( - "Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); + log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); } PreparedStatement stmt = tx.prepareBatchStatement(insertSql); setupInsert(entry, writer, stmt); @@ -325,57 +323,46 @@ public List selectBatch(Transaction tx, int batchSize, I stmt.setTimestamp(1, Timestamp.from(now)); stmt.setInt(2, batchSize); var result = new ArrayList(batchSize); - gatherResults(batchSize, stmt, result); + gatherResults(stmt, result); return result; } } @Override - public Set selectBatchOrdered(final Transaction tx, final int batchSize, final Instant now) throws Exception { + public List selectActiveTopics(Transaction tx) throws Exception { + var sql = "SELECT DISTINCT topic FROM %s WHERE topic <> '' AND processed = %s"; + String falseStr = dialect.booleanValue(false); + //noinspection resource try (PreparedStatement stmt = - tx.connection() - .prepareStatement( - dialect.isSupportsWindowFunctions() - ? batchOrderedSqlWindowFunctions() - : batchOrderedSqlNonWindowFunctions())) { - stmt.setTimestamp(1, Timestamp.from(now)); - stmt.setInt(2, batchSize); - var result = new HashSet(); - gatherResults(batchSize, stmt, result); + tx.connection().prepareStatement(String.format(sql, tableName, falseStr, falseStr))) { + var result = new ArrayList(); + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + result.add(rs.getString(1)); + } + } return result; } } - private String batchOrderedSqlWindowFunctions() { - var subquery = String.format( - "SELECT ROW_NUMBER() OVER (PARTITION BY topic ORDER BY seq) AS rn, %s FROM %s WHERE topic <> '' AND processed = %s", - ALL_FIELDS, - tableName, - dialect.booleanValue(false)); - return String.format( - "SELECT %s FROM (%s) t WHERE rn = 1 AND nextAttemptTime < ? AND topic <> '' and processed = %s %s", - ALL_FIELDS, - subquery, - dialect.booleanValue(false) - dialect.getLimitCriteria()); - } - - private String batchOrderedSqlNonWindowFunctions() { - var subquery = String.format( - "SELECT %s FROM (SELECT topic, MIN(seq) AS min_seq FROM %s WHERE topic <> '' AND processed = %s", - ALL_FIELDS, - tableName, - dialect.booleanValue(false)); - return String.format( - "SELECT %s FROM %s m JOIN (%s) t ON m.topic = t.topic AND m.seq = t.min_seq WHERE nextAttemptTime < ? AND topic <> '' and processed = %s %s", - ALL_FIELDS, - tableName, - subquery, - dialect.booleanValue(false), - dialect.getLimitCriteria()); + @Override + public Optional nextInTopic(Transaction tx, String topic) + throws Exception { + PreparedStatement stmt = + tx.prepareBatchStatement( + String.format( + "SELECT %s FROM %s " + + "WHERE topic = ? " + + "AND processed = %s " + + "ORDER BY seq ASC %s", + ALL_FIELDS, tableName, dialect.booleanValue(false), dialect.getLimitCriteria())); + stmt.setString(1, topic); + stmt.setInt(2, 1); + var results = new ArrayList(1); + gatherResults(stmt, results); + return results.stream().findFirst(); } - @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) throws Exception { @@ -389,7 +376,7 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) } } - private void gatherResults(int batchSize, PreparedStatement stmt, Collection output) + private void gatherResults(PreparedStatement stmt, Collection output) throws SQLException, IOException { try (ResultSet rs = stmt.executeQuery()) { while (rs.next()) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 15874f00..294134c9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -15,12 +15,6 @@ public interface Dialect { */ boolean isSupportsSkipLock(); - /** - * @return True if window functions are supported. This improves performance when using ordered - * processing. - */ - boolean isSupportsWindowFunctions(); - /** * @return Format string for the SQL required to delete expired retained records. */ @@ -37,7 +31,11 @@ public interface Dialect { Stream getMigrations(); Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); - Dialect MY_SQL_8 = DefaultDialect.builder("MY_SQL_8").supportsSkipLock(true).supportsWindowFunctions(true).build(); + Dialect MY_SQL_8 = + DefaultDialect.builder("MY_SQL_8") + .supportsSkipLock(true) + .supportsWindowFunctions(true) + .build(); Dialect POSTGRESQL_9 = DefaultDialect.builder("POSTGRESQL_9") .supportsSkipLock(true) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java index 6b3df028..1590911c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java @@ -2,7 +2,7 @@ import java.time.Instant; import java.util.List; -import java.util.Set; +import java.util.Optional; /** * Saves and loads {@link TransactionOutboxEntry}s. For most use cases, just use {@link @@ -106,7 +106,9 @@ static DefaultPersistor forDialect(Dialect dialect) { List selectBatch(Transaction tx, int batchSize, Instant now) throws Exception; - Set selectBatchOrdered(final Transaction tx, final int batchSize, final Instant now) throws Exception; + List selectActiveTopics(final Transaction tx) throws Exception; + + Optional nextInTopic(Transaction tx, String topic) throws Exception; /** * Deletes records which have processed and passed their expiry time, in specified batch sizes. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java index 45d8d449..4a827f5b 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java @@ -2,6 +2,7 @@ import java.time.Instant; import java.util.List; +import java.util.Optional; import lombok.Builder; /** Stub implementation of {@link Persistor}. */ @@ -45,6 +46,17 @@ public List selectBatch(Transaction tx, int batchSize, I return List.of(); } + @Override + public List selectActiveTopics(Transaction tx) throws Exception { + return List.of(); + } + + @Override + public Optional nextInTopic(Transaction tx, String topic) + throws Exception { + return Optional.empty(); + } + @Override public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now) { return 0; diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 0ea6f2f4..faeab1ab 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -92,6 +92,7 @@ static TransactionOutboxBuilder builder() { /** * TODO + * * @param executor * @return */ @@ -323,40 +324,40 @@ interface ParameterizedScheduleBuilder { * Specifies that the request should be applied in a strictly-ordered fashion within the * specified topic. * - *

This is useful for a number of applications, such as feeding messages into - * an ordered pipeline such as a FIFO queue or Kafka topic, or for reliable data replication, - * such as when feeding a data warehouse or distributed cache.

+ *

This is useful for a number of applications, such as feeding messages into an ordered + * pipeline such as a FIFO queue or Kafka topic, or for reliable data replication, such as when + * feeding a data warehouse or distributed cache. * - *

Note that using this option has a number of consequences:

+ *

Note that using this option has a number of consequences: * *

    - *
  • Requests are not processed immediately when submitting a request, as normal, and - * are not processed by {@link TransactionOutbox#flush()} either. Requests are processed - * up only when calling {@link TransactionOutbox#flushOrdered(Executor)}. As a result - * there will be increased delay between the source transaction being committed and the request - * being processed.
  • - *
  • If a request fails, no further requests will be processed in that topic until a - * subsequent retry allows the failing request to succeed, to preserve ordered processing. - * This means it is possible for topics to become entirely frozen in the event that a - * request fails repeatedly. For this reason, it is essential to use a - * {@link TransactionOutboxListener} to watch for failing requests and investigate - * quickly. Note that other topics will be unaffected.
  • - *
  • For the same reason, {@link TransactionOutboxBuilder#blockAfterAttempts} is - * ignored for all requests that use this option. The only safe way to recover from - * a failing request is to make the request succeed.
  • + *
  • Requests are not processed immediately when submitting a request, as normal, and are + * not processed by {@link TransactionOutbox#flush()} either. Requests are processed up + * only when calling {@link TransactionOutbox#flushOrdered(Executor)}. As a result there + * will be increased delay between the source transaction being committed and the request + * being processed. + *
  • If a request fails, no further requests will be processed in that topic until + * a subsequent retry allows the failing request to succeed, to preserve ordered + * processing. This means it is possible for topics to become entirely frozen in the event + * that a request fails repeatedly. For this reason, it is essential to use a {@link + * TransactionOutboxListener} to watch for failing requests and investigate quickly. Note + * that other topics will be unaffected. + *
  • For the same reason, {@link TransactionOutboxBuilder#blockAfterAttempts} is ignored for + * all requests that use this option. The only safe way to recover from a failing request + * is to make the request succeed. *
  • A single topic can only be processed in single-threaded fashion, so if your requests - * use a small number of topics, scalability will be affected since the degree of - * parallelism will be reduced.
  • + * use a small number of topics, scalability will be affected since the degree of + * parallelism will be reduced. *
  • Throughput is significantly reduced and database load increased more generally, even - * with larger numbers of topics, since records are only processed one-at-a-time rather - * than in batches, which is less optimised.
  • - *
  • In general, - * - * databases are not well optimised for this sort of thing. Don't expect miracles. If - * you need more throughput, you probably need to think twice about your architecture. - * Consider the event - * sourcing pattern, for example, where the message queue is the primary data - * store rather than a secondary, and remove the need for an outbox entirely.
  • + * with larger numbers of topics, since records are only processed one-at-a-time rather + * than in batches, which is less optimised. + *
  • In general, databases + * are not well optimised for this sort of thing. Don't expect miracles. If you need + * more throughput, you probably need to think twice about your architecture. Consider the + * event sourcing + * pattern, for example, where the message queue is the primary data store rather than + * a secondary, and remove the need for an outbox entirely. *
* * @param topic a free-text string up to 250 characters. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index ab0a6cfc..63a5e050 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -163,7 +163,6 @@ public void validate(Validator validator) { validator.nullOrNotBlank("uniqueRequestId", uniqueRequestId); validator.nullOrNotBlank("topic", topic); validator.notNull("invocation", invocation); - validator.inFuture("nextAttemptTime", nextAttemptTime); validator.positiveOrZero("attempts", attempts); validator.positiveOrZero("version", version); } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 46b42bf4..ac89dcb9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -8,9 +8,7 @@ import com.gruelbox.transactionoutbox.spi.ProxyFactory; import com.gruelbox.transactionoutbox.spi.Utils; import java.lang.reflect.InvocationTargetException; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; +import java.time.*; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -155,32 +153,35 @@ private List flush(Instant now) { @Override public boolean flushOrdered(Executor executor) { + log.debug("Processing topics"); return transactionManager.inTransactionReturns( tx -> { - Set failingTopics = new HashSet<>(); - Set lastSetOfResults = new HashSet<>(); - int count = 0; - do { - var nextSetOfResults = uncheckedly(() -> persistor.selectBatchOrdered(tx, flushBatchSize, clockProvider.get().instant())); - var batch = new ArrayList>(); - for (var request : nextSetOfResults) { - if (failingTopics.contains(request.getTopic())) { + var now = clockProvider.get().instant(); + var topics = uncheckedly(() -> persistor.selectActiveTopics(tx)); + var futures = new ArrayList>(); + if (!topics.isEmpty()) { + log.info("Active topics: {}", topics); + for (String topic : topics) { + var request = uncheckedly(() -> persistor.nextInTopic(tx, topic)); + if (request.isEmpty()) { + log.info(" > [{}] is already processed", topic); continue; } - if (lastSetOfResults.contains(request)) { - failingTopics.add(request.getTopic()); - } else { - count++; - batch.add(CompletableFuture.runAsync(() -> processNow(request), executor)); + if (!request.get().getNextAttemptTime().isBefore(now)) { + log.info( + " > [{}] seq {} is not ready for retry. Next attempt after {}", + topic, + request.get().getSequence(), + ZonedDateTime.ofInstant(request.get().getNextAttemptTime(), ZoneId.of("UTC"))); + continue; } + var future = CompletableFuture.runAsync(() -> processNow(request.get()), executor); + futures.add(future); } - lastSetOfResults = nextSetOfResults; - if (batch.isEmpty()) { - break; - } - CompletableFuture.allOf(batch.toArray(new CompletableFuture[0])).join(); - } while (count < flushBatchSize); - return count > 0; + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + return !futures.isEmpty(); + } + return false; }); } @@ -363,7 +364,7 @@ private TransactionOutboxEntry newEntry( args, serializeMdc && (MDC.getMDCAdapter() != null) ? MDC.getCopyOfContextMap() : null)) .lastAttemptTime(null) - .nextAttemptTime(after(attemptFrequency)) + .nextAttemptTime(clockProvider.get().instant()) .uniqueRequestId(uniqueRequestId) .topic(topic) .build(); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java index 0454c930..0d95a6b0 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java @@ -1,7 +1,6 @@ package com.gruelbox.transactionoutbox; import java.time.Clock; -import java.time.Instant; import java.util.function.Supplier; class Validator { @@ -51,13 +50,6 @@ public void notBlank(String propertyName, String object) { } } - public void inFuture(String propertyName, Instant object) { - notNull(propertyName, object); - if (!object.isAfter(clockProvider.get().instant())) { - error(propertyName, "must be in the future"); - } - } - public void positiveOrZero(String propertyName, int object) { min(propertyName, object, 0); } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 538ad5ff..99074582 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -41,7 +41,7 @@ public abstract class AbstractAcceptanceTest extends BaseTest { private static final Random random = new Random(); @Test - final void sequencing() { + final void sequencing() throws Exception { TransactionManager transactionManager = txManager(); TransactionOutbox outbox = TransactionOutbox.builder() @@ -58,13 +58,19 @@ final void sequencing() { outbox.initialize(); clearOutbox(); - transactionManager.inTransaction( - () -> - outbox - .with() - .ordered("my-topic") - .schedule(InterfaceProcessor.class) - .process(3, "Whee")); + withRunningFlusher( + outbox, + () -> { + transactionManager.inTransaction( + () -> + outbox + .with() + .ordered("my-topic") + .schedule(InterfaceProcessor.class) + .process(3, "Whee")); + + Thread.sleep(1000); + }); } /** diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java index 7cf9ed2f..28baf52b 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java @@ -4,6 +4,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.SQLException; +import java.util.concurrent.ForkJoinPool; import lombok.Builder; import lombok.Value; import lombok.experimental.Accessors; @@ -73,8 +74,10 @@ protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable run // Keep flushing work until there's nothing left to flush //noinspection StatementWithEmptyBody while (outbox.flush()) {} + //noinspection StatementWithEmptyBody + while (outbox.flushOrdered(ForkJoinPool.commonPool())) {} } catch (Exception e) { - log.error("Error flushing transaction outbox. Pausing", e); + log.error("Error flushing transaction outbox", e); } try { //noinspection BusyWait From 921d313848d58bdc0608fe449f511cfe9b02b954 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sat, 16 Mar 2024 22:49:05 +0000 Subject: [PATCH 04/11] Fix warnings --- .../java/com/gruelbox/transactionoutbox/StubPersistor.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java index 4a827f5b..a9826388 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java @@ -47,13 +47,12 @@ public List selectBatch(Transaction tx, int batchSize, I } @Override - public List selectActiveTopics(Transaction tx) throws Exception { + public List selectActiveTopics(Transaction tx) { return List.of(); } @Override - public Optional nextInTopic(Transaction tx, String topic) - throws Exception { + public Optional nextInTopic(Transaction tx, String topic) { return Optional.empty(); } From 90b28647d189ec551b73e7eac3c86ef66f882991 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 17 Mar 2024 14:08:21 +0000 Subject: [PATCH 05/11] Add testing and fix a bunch of things --- .../transactionoutbox/DefaultDialect.java | 13 ++-- .../transactionoutbox/DefaultPersistor.java | 26 +++---- .../gruelbox/transactionoutbox/Dialect.java | 14 ++-- .../transactionoutbox/TransactionOutbox.java | 31 ++++---- .../TransactionOutboxEntry.java | 1 + .../TransactionOutboxImpl.java | 76 ++++++++++--------- .../gruelbox/transactionoutbox/Validator.java | 6 ++ .../transactionoutbox/TestValidator.java | 23 ------ .../testing/AbstractAcceptanceTest.java | 41 ++++++++-- .../transactionoutbox/testing/BaseTest.java | 15 ++-- 10 files changed, 131 insertions(+), 115 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 4d448547..6fdaffb4 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -123,14 +123,16 @@ static final class Builder { migrations.put( 9, new Migration( - 9, "Add topic", "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NULL")); + 9, + "Add topic", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NOT NULL DEFAULT '*'")); migrations.put( 10, new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL")); migrations.put( 11, new Migration( - 10, + 11, "Add sequence table", "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))")); migrations.put( @@ -156,12 +158,7 @@ Builder disableMigration(@SuppressWarnings("SameParameterValue") int version) { Dialect build() { return new DefaultDialect( - name, - supportsSkipLock, - deleteExpired, - limitCriteria, - checkSql, - migrations.values()) { + name, supportsSkipLock, deleteExpired, limitCriteria, checkSql, migrations.values()) { @Override public String booleanValue(boolean criteriaValue) { if (booleanValueFrom != null) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 3cf6669d..bdc304c9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -4,13 +4,7 @@ import java.io.Reader; import java.io.StringWriter; import java.io.Writer; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLTimeoutException; -import java.sql.Statement; -import java.sql.Timestamp; +import java.sql.*; import java.time.Instant; import java.util.*; import lombok.AccessLevel; @@ -185,8 +179,12 @@ private void setupInsert( stmt.setString(1, entry.getId()); stmt.setString(2, entry.getUniqueRequestId()); stmt.setString(3, writer.toString()); - stmt.setString(4, entry.getTopic() == null ? "" : entry.getTopic()); - stmt.setLong(5, entry.getSequence()); + stmt.setString(4, entry.getTopic() == null ? "*" : entry.getTopic()); + if (entry.getSequence() == null) { + stmt.setObject(5, null); + } else { + stmt.setLong(5, entry.getSequence()); + } stmt.setTimestamp( 6, entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime())); stmt.setTimestamp(7, Timestamp.from(entry.getNextAttemptTime())); @@ -317,7 +315,7 @@ public List selectBatch(Transaction tx, int batchSize, I + dialect.booleanValue(false) + " AND processed = " + dialect.booleanValue(false) - + " AND topic = ''" + + " AND topic = '*'" + dialect.getLimitCriteria() + forUpdate)) { stmt.setTimestamp(1, Timestamp.from(now)); @@ -330,7 +328,7 @@ public List selectBatch(Transaction tx, int batchSize, I @Override public List selectActiveTopics(Transaction tx) throws Exception { - var sql = "SELECT DISTINCT topic FROM %s WHERE topic <> '' AND processed = %s"; + var sql = "SELECT DISTINCT topic FROM %s WHERE topic <> '*' AND processed = %s"; String falseStr = dialect.booleanValue(false); //noinspection resource try (PreparedStatement stmt = @@ -351,10 +349,10 @@ public Optional nextInTopic(Transaction tx, String topic PreparedStatement stmt = tx.prepareBatchStatement( String.format( - "SELECT %s FROM %s " + "SELECT * FROM (SELECT %s FROM %s " + "WHERE topic = ? " + "AND processed = %s " - + "ORDER BY seq ASC %s", + + "ORDER BY seq ASC) x WHERE 1=1 %s", ALL_FIELDS, tableName, dialect.booleanValue(false), dialect.getLimitCriteria())); stmt.setString(1, topic); stmt.setInt(2, 1); @@ -398,7 +396,7 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio .id(rs.getString("id")) .uniqueRequestId(rs.getString("uniqueRequestId")) .invocation(serializer.deserializeInvocation(invocationStream)) - .topic("".equals(topic) ? null : topic) + .topic("*".equals(topic) ? null : topic) .sequence(sequence) .lastAttemptTime( rs.getTimestamp("lastAttemptTime") == null diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 294134c9..ebc11260 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -31,15 +31,10 @@ public interface Dialect { Stream getMigrations(); Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); - Dialect MY_SQL_8 = - DefaultDialect.builder("MY_SQL_8") - .supportsSkipLock(true) - .supportsWindowFunctions(true) - .build(); + Dialect MY_SQL_8 = DefaultDialect.builder("MY_SQL_8").supportsSkipLock(true).build(); Dialect POSTGRESQL_9 = DefaultDialect.builder("POSTGRESQL_9") .supportsSkipLock(true) - .supportsWindowFunctions(true) .deleteExpired( "DELETE FROM {{table}} WHERE id IN (SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT ?)") .changeMigration( @@ -51,14 +46,12 @@ public interface Dialect { Dialect H2 = DefaultDialect.builder("H2") - .supportsWindowFunctions(true) .changeMigration(5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId VARCHAR(250)") .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") .disableMigration(8) .build(); Dialect ORACLE = DefaultDialect.builder("ORACLE") - .supportsWindowFunctions(true) .supportsSkipLock(true) .deleteExpired( "DELETE FROM {{table}} WHERE nextAttemptTime < ? AND processed = 1 AND blocked = 0 AND ROWNUM <= ?") @@ -81,6 +74,11 @@ public interface Dialect { .changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked") .changeMigration(7, "ALTER TABLE TXNO_OUTBOX ADD lastAttemptTime TIMESTAMP(6)") .disableMigration(8) + .changeMigration(9, "ALTER TABLE TXNO_OUTBOX ADD topic VARCHAR(250) DEFAULT '*' NOT NULL") + .changeMigration(10, "ALTER TABLE TXNO_OUTBOX ADD seq NUMBER") + .changeMigration( + 11, + "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") .booleanValueFrom(v -> v ? "1" : "0") .createVersionTableBy( connection -> { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index faeab1ab..59bcdf67 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -66,6 +66,17 @@ static TransactionOutboxBuilder builder() { */ ParameterizedScheduleBuilder with(); + /** + * Flush in a single thread. Calls {@link #flush(Executor)} with an {@link Executor} which runs + * all work in the current thread. + * + * @see #flush(Executor) + * @return true if any work was flushed. + */ + default boolean flush() { + return flush(Runnable::run); + } + /** * Identifies any stale tasks queued using {@link #schedule(Class)} (those which were queued more * than supplied {@link TransactionOutboxBuilder#attemptFrequency(Duration)} ago and have been @@ -85,18 +96,11 @@ static TransactionOutboxBuilder builder() { *

Additionally, expires any records completed prior to the {@link * TransactionOutboxBuilder#retentionThreshold(Duration)}. * + * @param executor to be used for parallelising work (note that the method overall is blocking and + * this is solely ued for fork-join semantics). * @return true if any work was flushed. */ - @SuppressWarnings("UnusedReturnValue") - boolean flush(); - - /** - * TODO - * - * @param executor - * @return - */ - boolean flushOrdered(Executor executor); + boolean flush(Executor executor); /** * Unblocks a blocked entry and resets the attempt count so that it will be retried again. @@ -332,10 +336,9 @@ interface ParameterizedScheduleBuilder { * *

    *
  • Requests are not processed immediately when submitting a request, as normal, and are - * not processed by {@link TransactionOutbox#flush()} either. Requests are processed up - * only when calling {@link TransactionOutbox#flushOrdered(Executor)}. As a result there - * will be increased delay between the source transaction being committed and the request - * being processed. + * processed by {@link TransactionOutbox#flush()} only. As a result there will be + * increased delay between the source transaction being committed and the request being + * processed. *
  • If a request fails, no further requests will be processed in that topic until * a subsequent retry allows the failing request to succeed, to preserve ordered * processing. This means it is possible for topics to become entirely frozen in the event diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 63a5e050..01749722 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -165,5 +165,6 @@ public void validate(Validator validator) { validator.notNull("invocation", invocation); validator.positiveOrZero("attempts", attempts); validator.positiveOrZero("version", version); + validator.isTrue("topic", !"*".equals(topic), "Topic may not be *"); } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index ac89dcb9..19ab5e3c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -114,19 +114,7 @@ public ParameterizedScheduleBuilder with() { return new ParameterizedScheduleBuilderImpl(); } - @SuppressWarnings("UnusedReturnValue") - @Override - public boolean flush() { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } - Instant now = clockProvider.get().instant(); - List batch = flush(now); - expireIdempotencyProtection(now); - return !batch.isEmpty(); - } - - private List flush(Instant now) { + private boolean flushStale(Instant now) { log.debug("Flushing stale tasks"); var batch = transactionManager.inTransactionReturns( @@ -148,41 +136,57 @@ private List flush(Instant now) { log.debug("Got batch of {}", batch.size()); batch.forEach(this::submitNow); log.debug("Submitted batch"); - return batch; + return !batch.isEmpty(); } @Override - public boolean flushOrdered(Executor executor) { - log.debug("Processing topics"); - return transactionManager.inTransactionReturns( + public boolean flush(Executor executor) { + if (!initialized.get()) { + throw new IllegalStateException("Not initialized"); + } + Instant now = clockProvider.get().instant(); + List> futures = new ArrayList<>(); + + futures.add(CompletableFuture.supplyAsync(() -> flushStale(now), executor)); + + futures.add( + CompletableFuture.runAsync(() -> expireIdempotencyProtection(now), executor) + .thenApply(it -> false)); + + transactionManager.inTransaction( tx -> { - var now = clockProvider.get().instant(); var topics = uncheckedly(() -> persistor.selectActiveTopics(tx)); - var futures = new ArrayList>(); if (!topics.isEmpty()) { log.info("Active topics: {}", topics); for (String topic : topics) { - var request = uncheckedly(() -> persistor.nextInTopic(tx, topic)); - if (request.isEmpty()) { - log.info(" > [{}] is already processed", topic); - continue; - } - if (!request.get().getNextAttemptTime().isBefore(now)) { - log.info( - " > [{}] seq {} is not ready for retry. Next attempt after {}", - topic, - request.get().getSequence(), - ZonedDateTime.ofInstant(request.get().getNextAttemptTime(), ZoneId.of("UTC"))); - continue; + var future = flushTopic(executor, tx, topic, now); + if (future != null) { + futures.add(future.thenApply(it -> true)); } - var future = CompletableFuture.runAsync(() -> processNow(request.get()), executor); - futures.add(future); } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - return !futures.isEmpty(); } - return false; }); + + var allResults = futures.stream().reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> d1 || d2)); + return allResults.map(CompletableFuture::join).orElse(false); + } + + private CompletableFuture flushTopic( + Executor executor, Transaction tx, String topic, Instant now) { + var request = uncheckedly(() -> persistor.nextInTopic(tx, topic)); + if (request.isEmpty()) { + log.info(" > [{}] is already processed", topic); + return null; + } + if (!request.get().getNextAttemptTime().isBefore(now)) { + log.info( + " > [{}] seq {} is not ready for retry. Next attempt after {}", + topic, + request.get().getSequence(), + ZonedDateTime.ofInstant(request.get().getNextAttemptTime(), ZoneId.of("UTC"))); + return null; + } + return CompletableFuture.runAsync(() -> processNow(request.get()), executor); } private void expireIdempotencyProtection(Instant now) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java index 0d95a6b0..a672afa5 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Validator.java @@ -37,6 +37,12 @@ public void notNull(String propertyName, Object object) { } } + public void isTrue(String propertyName, boolean condition, String message, Object... args) { + if (!condition) { + error(propertyName, String.format(message, args)); + } + } + public void nullOrNotBlank(String propertyName, String object) { if (object != null && object.isEmpty()) { error(propertyName, "may be either null or non-blank"); diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java index 305a97be..e1921d03 100644 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestValidator.java @@ -1,7 +1,6 @@ package com.gruelbox.transactionoutbox; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; import java.math.BigDecimal; import java.time.Clock; @@ -21,28 +20,6 @@ class TestValidator { private final Instant now = Instant.now(); private final Validator validator = new Validator(() -> Clock.fixed(now, ZoneId.of("+4"))); - @Test - void testEntryDateInPast() { - TransactionOutboxEntry entry = - TransactionOutboxEntry.builder() - .id("FOO") - .invocation(COMPLEX_INVOCATION) - .nextAttemptTime(now.minusMillis(1)) - .build(); - assertThrows(IllegalArgumentException.class, () -> validator.validate(entry)); - } - - @Test - void testEntryDateNow() { - TransactionOutboxEntry entry = - TransactionOutboxEntry.builder() - .id("FOO") - .invocation(COMPLEX_INVOCATION) - .nextAttemptTime(now) - .build(); - assertThrows(IllegalArgumentException.class, () -> validator.validate(entry)); - } - @Test void testEntryDateFuture() { TransactionOutboxEntry entry = diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 99074582..6eda9b3d 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -42,6 +42,10 @@ public abstract class AbstractAcceptanceTest extends BaseTest { @Test final void sequencing() throws Exception { + AtomicInteger lastSequenceTopic1 = new AtomicInteger(); + AtomicInteger lastSequenceTopic2 = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(200); + TransactionManager transactionManager = txManager(); TransactionOutbox outbox = TransactionOutbox.builder() @@ -50,7 +54,20 @@ final void sequencing() throws Exception { Instantiator.using( clazz -> (InterfaceProcessor) - (foo, bar) -> LOGGER.info("Processing ({}, {})", foo, bar))) + (foo, bar) -> { + var lastSequence = + ("topic1".equals(bar)) ? lastSequenceTopic1 : lastSequenceTopic2; + if ((lastSequence.get() == foo - 1)) { + lastSequence.set(foo); + latch.countDown(); + } else { + latch.countDown(); + throw new IllegalStateException( + String.format( + "Ordering violated (previous=%s, seq=%s", + lastSequence.get(), foo)); + } + })) .persistor(persistor()) .initializeImmediately(false) .build(); @@ -61,15 +78,25 @@ final void sequencing() throws Exception { withRunningFlusher( outbox, () -> { - transactionManager.inTransaction( - () -> + for (int ix = 1; ix <= 100; ix++) { + final int i = ix; + transactionManager.inTransaction( + () -> { outbox .with() - .ordered("my-topic") + .ordered("topic1") .schedule(InterfaceProcessor.class) - .process(3, "Whee")); - - Thread.sleep(1000); + .process(i, "topic1"); + outbox + .with() + .ordered("topic2") + .schedule(InterfaceProcessor.class) + .process(i, "topic2"); + }); + } + assertTrue(latch.await(30, SECONDS)); + assertEquals(100, lastSequenceTopic1.get()); + assertEquals(100, lastSequenceTopic2.get()); }); } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java index 28baf52b..4145c4f2 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java @@ -4,18 +4,22 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.SQLException; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import lombok.Builder; import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @Slf4j public abstract class BaseTest { protected HikariDataSource dataSource; + private ExecutorService flushExecutor; @BeforeEach final void baseBeforeEach() { @@ -25,10 +29,13 @@ final void baseBeforeEach() { config.setPassword(connectionDetails().password()); config.addDataSourceProperty("cachePrepStmts", "true"); dataSource = new HikariDataSource(config); + flushExecutor = Executors.newFixedThreadPool(4); } @AfterEach - final void baseAfterEach() { + final void baseAfterEach() throws InterruptedException { + flushExecutor.shutdown(); + Assertions.assertTrue(flushExecutor.awaitTermination(30, TimeUnit.SECONDS)); dataSource.close(); } @@ -73,9 +80,7 @@ protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable run try { // Keep flushing work until there's nothing left to flush //noinspection StatementWithEmptyBody - while (outbox.flush()) {} - //noinspection StatementWithEmptyBody - while (outbox.flushOrdered(ForkJoinPool.commonPool())) {} + while (outbox.flush(flushExecutor)) {} } catch (Exception e) { log.error("Error flushing transaction outbox", e); } From f6c01b0a0c7b430a30f3bdea9ccfb441e04b4bd6 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 17 Mar 2024 16:24:35 +0000 Subject: [PATCH 06/11] WIP --- .../transactionoutbox/DefaultDialect.java | 21 +++ .../transactionoutbox/DefaultPersistor.java | 9 +- .../gruelbox/transactionoutbox/Dialect.java | 12 ++ .../transactionoutbox/Invocation.java | 19 +++ .../TransactionOutboxImpl.java | 140 +++++++++++------- 5 files changed, 136 insertions(+), 65 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 6fdaffb4..c941f3cc 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.Map; import java.util.TreeMap; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; import lombok.AccessLevel; @@ -41,6 +42,17 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep } } + @Override + public String fetchAndLockNextInTopic(String fields, String table) { + return String.format( + "SELECT %s FROM %s" + + " WHERE topic = " + + " AND processed = %s" + + " ORDER BY seq ASC" + + " %s FOR UPDATE", + fields, table, booleanValue(false), limitCriteria.replace("?", "1")); + } + @Override public String toString() { return name; @@ -63,6 +75,7 @@ static final class Builder { private Map migrations; private Function booleanValueFrom; private SQLAction createVersionTableBy; + private BiFunction fetchAndLockNextInTopic; Builder(String name) { this.name = name; @@ -175,6 +188,14 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep super.createVersionTableIfNotExists(connection); } } + + @Override + public String fetchAndLockNextInTopic(String fields, String table) { + if (fetchAndLockNextInTopic != null) { + return fetchAndLockNextInTopic.apply(fields, table); + } + return super.fetchAndLockNextInTopic(fields, table); + } }; } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index bdc304c9..49f4f628 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -347,15 +347,8 @@ public List selectActiveTopics(Transaction tx) throws Exception { public Optional nextInTopic(Transaction tx, String topic) throws Exception { PreparedStatement stmt = - tx.prepareBatchStatement( - String.format( - "SELECT * FROM (SELECT %s FROM %s " - + "WHERE topic = ? " - + "AND processed = %s " - + "ORDER BY seq ASC) x WHERE 1=1 %s", - ALL_FIELDS, tableName, dialect.booleanValue(false), dialect.getLimitCriteria())); + tx.prepareBatchStatement(dialect.fetchAndLockNextInTopic(ALL_FIELDS, tableName)); stmt.setString(1, topic); - stmt.setInt(2, 1); var results = new ArrayList(1); gatherResults(stmt, results); return results.stream().findFirst(); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index ebc11260..c1aff8ed 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -28,6 +28,8 @@ public interface Dialect { void createVersionTableIfNotExists(Connection connection) throws SQLException; + String fetchAndLockNextInTopic(String fields, String table); + Stream getMigrations(); Dialect MY_SQL_5 = DefaultDialect.builder("MY_SQL_5").build(); @@ -93,5 +95,15 @@ public interface Dialect { } } }) + .fetchAndLockNextInTopic( + (fields, table) -> + String.format( + "SELECT %s FROM %s outer" + + " WHERE outer.topic = ?" + + " AND outer.processed = 0" + + " AND outer.seq = (" + + "SELECT MIN(seq) FROM %s inner WHERE inner.topic=outer.topic AND inner.processed=0" + + " ) FOR UPDATE", + fields, table, table)) .build(); } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java index 0a91e113..3cc815d8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java @@ -5,6 +5,7 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.Callable; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -108,6 +109,24 @@ void withinMDC(Runnable runnable) { } } + T withinMDC(Callable callable) throws Exception { + if (mdc != null && MDC.getMDCAdapter() != null) { + var oldMdc = MDC.getCopyOfContextMap(); + MDC.setContextMap(mdc); + try { + return callable.call(); + } finally { + if (oldMdc == null) { + MDC.clear(); + } else { + MDC.setContextMap(oldMdc); + } + } + } else { + return callable.call(); + } + } + void invoke(Object instance, TransactionOutboxListener listener) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { Method method = instance.getClass().getDeclaredMethod(methodName, parameterTypes); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 19ab5e3c..1c557ad8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -13,6 +13,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -159,10 +160,9 @@ public boolean flush(Executor executor) { if (!topics.isEmpty()) { log.info("Active topics: {}", topics); for (String topic : topics) { - var future = flushTopic(executor, tx, topic, now); - if (future != null) { - futures.add(future.thenApply(it -> true)); - } + futures.add( + CompletableFuture.runAsync(() -> processNextInTopic(topic), executor) + .thenApply(it -> true)); } } }); @@ -171,24 +171,6 @@ public boolean flush(Executor executor) { return allResults.map(CompletableFuture::join).orElse(false); } - private CompletableFuture flushTopic( - Executor executor, Transaction tx, String topic, Instant now) { - var request = uncheckedly(() -> persistor.nextInTopic(tx, topic)); - if (request.isEmpty()) { - log.info(" > [{}] is already processed", topic); - return null; - } - if (!request.get().getNextAttemptTime().isBefore(now)) { - log.info( - " > [{}] seq {} is not ready for retry. Next attempt after {}", - topic, - request.get().getSequence(), - ZonedDateTime.ofInstant(request.get().getNextAttemptTime(), ZoneId.of("UTC"))); - return null; - } - return CompletableFuture.runAsync(() -> processNow(request.get()), executor); - } - private void expireIdempotencyProtection(Instant now) { long totalRecordsDeleted = 0; int recordsDeleted; @@ -300,45 +282,89 @@ private void submitNow(TransactionOutboxEntry entry) { @Override @SuppressWarnings("WeakerAccess") public void processNow(TransactionOutboxEntry entry) { + initialize(); + Boolean success = null; + try { + success = + transactionManager.inTransactionReturnsThrows( + tx -> { + if (!persistor.lock(tx, entry)) { + return false; + } + processWithExistingLock(tx, entry); + return true; + }); + } catch (InvocationTargetException e) { + updateAttemptCount(entry, e.getCause()); + } catch (Exception e) { + updateAttemptCount(entry, e); + } + if (success != null) { + if (success) { + log.info("Processed {}", entry.description()); + listener.success(entry); + } else { + log.debug("Skipped task {} - may be locked or already processed", entry.getId()); + } + } + } + + private void processNextInTopic(String topic) { + var attempted = new AtomicReference(); + var success = false; + try { + success = + transactionManager.inTransactionReturnsThrows( + tx -> { + var next = + persistor + .nextInTopic(tx, topic) + .filter( + it -> it.getNextAttemptTime().isBefore(clockProvider.get().instant())); + if (next.isPresent()) { + attempted.set(next.get()); + processWithExistingLock(tx, next.get()); + return true; + } + return false; + }); + } catch (InvocationTargetException e) { + if (attempted.get() != null) { + updateAttemptCount(attempted.get(), e.getCause()); + } + } catch (Exception e) { + if (attempted.get() != null) { + updateAttemptCount(attempted.get(), e); + } + } + if (success) { + log.info("Processed {}", attempted.get().description()); + listener.success(attempted.get()); + } else if (attempted.get() == null) { + log.debug("Nothing available in topic {}. May be empty or pausing for retry", topic); + } + } + + private void processWithExistingLock(Transaction tx, TransactionOutboxEntry entry) + throws Exception { + initialize(); entry .getInvocation() .withinMDC( () -> { - try { - initialize(); - var success = - transactionManager.inTransactionReturnsThrows( - transaction -> { - if (!persistor.lock(transaction, entry)) { - return false; - } - log.info("Processing {}", entry.description()); - invoke(entry, transaction); - if (entry.getUniqueRequestId() == null) { - persistor.delete(transaction, entry); - } else { - log.debug( - "Deferring deletion of {} by {}", - entry.description(), - retentionThreshold); - entry.setProcessed(true); - entry.setLastAttemptTime(Instant.now(clockProvider.get())); - entry.setNextAttemptTime(after(retentionThreshold)); - persistor.update(transaction, entry); - } - return true; - }); - if (success) { - log.info("Processed {}", entry.description()); - listener.success(entry); - } else { - log.debug("Skipped task {} - may be locked or already processed", entry.getId()); - } - } catch (InvocationTargetException e) { - updateAttemptCount(entry, e.getCause()); - } catch (Exception e) { - updateAttemptCount(entry, e); + log.info("Processing {}", entry.description()); + invoke(entry, tx); + if (entry.getUniqueRequestId() == null) { + persistor.delete(tx, entry); + } else { + log.debug( + "Deferring deletion of {} by {}", entry.description(), retentionThreshold); + entry.setProcessed(true); + entry.setLastAttemptTime(Instant.now(clockProvider.get())); + entry.setNextAttemptTime(after(retentionThreshold)); + persistor.update(tx, entry); } + return true; }); } @@ -395,7 +421,7 @@ private Instant after(Duration duration) { private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) { try { entry.setAttempts(entry.getAttempts() + 1); - var blocked = entry.getAttempts() >= blockAfterAttempts; + var blocked = entry.getTopic() != null & entry.getAttempts() >= blockAfterAttempts; entry.setBlocked(blocked); entry.setLastAttemptTime(Instant.now(clockProvider.get())); entry.setNextAttemptTime(after(attemptFrequency)); From 5ab2f66cb5560cd273b5e8849b07ea205fca5bf6 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 17 Mar 2024 17:30:07 +0000 Subject: [PATCH 07/11] Tidying --- .github/workflows/cd_build.yml | 2 +- README.md | 53 ++++++++++++++++++- .../transactionoutbox/DefaultDialect.java | 2 +- .../TransactionOutboxImpl.java | 51 +++++++++--------- .../testing/AbstractAcceptanceTest.java | 17 ++++-- 5 files changed, 92 insertions(+), 33 deletions(-) diff --git a/.github/workflows/cd_build.yml b/.github/workflows/cd_build.yml index c653cd67..180d7351 100644 --- a/.github/workflows/cd_build.yml +++ b/.github/workflows/cd_build.yml @@ -25,7 +25,7 @@ jobs: - name: Build, publish to GPR and tag run: | if [ "$GITHUB_REPOSITORY" == "gruelbox/transaction-outbox" ]; then - revision="5.4.$GITHUB_RUN_NUMBER" + revision="5.5.$GITHUB_RUN_NUMBER" echo "Building $revision at $GITHUB_SHA" mvn -Pconcise,delombok -B deploy -s $GITHUB_WORKSPACE/settings.xml -Drevision="$revision" -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn echo "Tagging $revision" diff --git a/README.md b/README.md index f7bf45ed..db1b7cfe 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ A flexible implementation of the [Transaction Outbox Pattern](https://microservi 1. [Set up the background worker](#set-up-the-background-worker) 1. [Managing the "dead letter queue"](#managing-the-dead-letter-queue) 1. [Advanced](#advanced) + 1. [Topics and FIFO ordering](#topics-and-fifo-ordering) 1. [The nested outbox pattern](#the-nested-outbox-pattern) 1. [Idempotency protection](#idempotency-protection) 1. [Flexible serialization](#flexible-serialization-beta) @@ -291,13 +292,13 @@ TransactionOutbox.builder() To mark the work for reprocessing, just use [`TransactionOutbox.unblock()`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-core/latest/com/gruelbox/transactionoutbox/TransactionOutbox.html). Its failure count will be marked back down to zero and it will get reprocessed on the next call to `flush()`: -``` +```java transactionOutboxEntry.unblock(entryId); ``` Or if using a `TransactionManager` that relies on explicit context (such as a non-thread local [`JooqTransactionManager`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-jooq/latest/com/gruelbox/transactionoutbox/JooqTransactionManager.html)): -``` +```java transactionOutboxEntry.unblock(entryId, context); ``` @@ -305,6 +306,54 @@ A good approach here is to use the [`TransactionOutboxListener`](https://www.jav ## Advanced +### Topics and FIFO ordering + +For some applications, the order in which tasks are processed is important, such as when: + + - using the outbox to write to a FIFO queue, Kafka or AWS Kinesis topic; or + - data replication, e.g. when feeding a data warehouse or distributed cache. + +In these scenarios, the default behaviour is unsuitable. Tasks are usually processed in a highly parallel fashion. +Even if the volume of tasks is low, if a task fails and is retried later, it can easily end up processing after +some later task even if that later task was processed hours or even days after the failing one. + +To avoid problems associated with tasks being processed out-of-order, you can order the processing of your tasks +within a named "topic": + +```java +outbox.with().ordered("topic1").schedule(Service.class).process("red"); +outbox.with().ordered("topic2").schedule(Service.class).process("green"); +outbox.with().ordered("topic1").schedule(Service.class).process("blue"); +outbox.with().ordered("topic2").schedule(Service.class).process("yellow"); +``` + +No matter what happens: + + - `red` will always need to be processed (successfully) before `blue`; + - `green` will always need to be processed (successfully) before `yellow`; but + - `red` and `blue` can run in any sequence with respect to `green` and `yellow`. + +This functionality was specifically designed to allow outboxed writing to Kafka topics. For maximum throughput +when writing to Kafka, it is advised that you form your outbox topic name by combining the Kafka topic and partition, +since that is the boundary where ordering is required. + +There are a number of things to consider before using this feature: + + - Tasks are not processed immediately when submitting, as normal, and are processed by + background flushing only. This means there will be an increased delay between the source transaction being + committed and the task being processed, depending on how your application calls `TransactionOutbox.flush()`. + - If a task fails, no further requests will be processed _in that topic_ until + a subsequent retry allows the failing task to succeed, to preserve ordered + processing. This means it is possible for topics to become entirely frozen in the event + that a task fails repeatedly. For this reason, it is essential to use a + `TransactionOutboxListener` to watch for failing tasks and investigate quickly. Note + that other topics will be unaffected. + - `TransactionOutboxBuilder.blockAfterAttempts` is ignored for all tasks that use this + option. + - A single topic can only be processed in single-threaded fashion, but separate topics can be processed in + parallel. If your tasks use a small number of topics, scalability will be affected since the degree of + parallelism will be reduced. + ### The nested-outbox pattern In practice it can be extremely hard to guarantee that an entire unit of work is idempotent and thus suitable for retry. For example, the request might be to "update a customer record" with a new address, but this might record the change to an audit history table with a fresh UUID, the current date and time and so on, which in turn triggers external changes outside the transaction. The parent customer update request may be idempotent, but the downstream effects may not be. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index c941f3cc..7eb95e93 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -46,7 +46,7 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep public String fetchAndLockNextInTopic(String fields, String table) { return String.format( "SELECT %s FROM %s" - + " WHERE topic = " + + " WHERE topic = ?" + " AND processed = %s" + " ORDER BY seq ASC" + " %s FOR UPDATE", diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 1c557ad8..482f7058 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -154,18 +154,14 @@ public boolean flush(Executor executor) { CompletableFuture.runAsync(() -> expireIdempotencyProtection(now), executor) .thenApply(it -> false)); - transactionManager.inTransaction( - tx -> { - var topics = uncheckedly(() -> persistor.selectActiveTopics(tx)); - if (!topics.isEmpty()) { - log.info("Active topics: {}", topics); - for (String topic : topics) { - futures.add( - CompletableFuture.runAsync(() -> processNextInTopic(topic), executor) - .thenApply(it -> true)); - } - } - }); + var topics = + transactionManager.inTransactionReturns( + tx -> uncheckedly(() -> persistor.selectActiveTopics(tx))); + if (!topics.isEmpty()) { + topics.stream() + .map(topic -> CompletableFuture.supplyAsync(() -> processNextInTopic(topic), executor)) + .forEach(futures::add); + } var allResults = futures.stream().reduce((f1, f2) -> f1.thenCombine(f2, (d1, d2) -> d1 || d2)); return allResults.map(CompletableFuture::join).orElse(false); @@ -309,24 +305,30 @@ public void processNow(TransactionOutboxEntry entry) { } } - private void processNextInTopic(String topic) { + private boolean processNextInTopic(String topic) { var attempted = new AtomicReference(); var success = false; try { success = transactionManager.inTransactionReturnsThrows( tx -> { - var next = - persistor - .nextInTopic(tx, topic) - .filter( - it -> it.getNextAttemptTime().isBefore(clockProvider.get().instant())); + var next = persistor.nextInTopic(tx, topic); if (next.isPresent()) { - attempted.set(next.get()); - processWithExistingLock(tx, next.get()); - return true; + if (next.get().getNextAttemptTime().isBefore(clockProvider.get().instant())) { + log.info("Topic {}: processing seq {}", topic, next.get().getSequence()); + attempted.set(next.get()); + processWithExistingLock(tx, next.get()); + return true; + } else { + log.info( + "Topic {}: ignoring until {}", + topic, + ZonedDateTime.ofInstant(next.get().getNextAttemptTime(), ZoneId.of("UTC"))); + return false; + } + } else { + return false; } - return false; }); } catch (InvocationTargetException e) { if (attempted.get() != null) { @@ -340,9 +342,8 @@ private void processNextInTopic(String topic) { if (success) { log.info("Processed {}", attempted.get().description()); listener.success(attempted.get()); - } else if (attempted.get() == null) { - log.debug("Nothing available in topic {}. May be empty or pausing for retry", topic); } + return success; } private void processWithExistingLock(Transaction tx, TransactionOutboxEntry entry) @@ -421,7 +422,7 @@ private Instant after(Duration duration) { private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) { try { entry.setAttempts(entry.getAttempts() + 1); - var blocked = entry.getTopic() != null & entry.getAttempts() >= blockAfterAttempts; + var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts); entry.setBlocked(blocked); entry.setLastAttemptTime(Instant.now(clockProvider.get())); entry.setNextAttemptTime(after(attemptFrequency)); diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 6eda9b3d..ab08a8ff 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -35,6 +35,7 @@ public abstract class AbstractAcceptanceTest extends BaseTest { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptanceTest.class); + private final ExecutorService unreliablePool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16)); @@ -42,19 +43,27 @@ public abstract class AbstractAcceptanceTest extends BaseTest { @Test final void sequencing() throws Exception { + int countPerTopic = 50; + AtomicInteger lastSequenceTopic1 = new AtomicInteger(); AtomicInteger lastSequenceTopic2 = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(200); + CountDownLatch latch = new CountDownLatch(countPerTopic * 2); TransactionManager transactionManager = txManager(); TransactionOutbox outbox = TransactionOutbox.builder() .transactionManager(transactionManager) + .submitter(Submitter.withExecutor(unreliablePool)) + .attemptFrequency(Duration.ofMillis(500)) .instantiator( Instantiator.using( clazz -> (InterfaceProcessor) (foo, bar) -> { + if (random.nextInt(10) == 5) { + throw new RuntimeException( + "Temporary failure of InterfaceProcessor"); + } var lastSequence = ("topic1".equals(bar)) ? lastSequenceTopic1 : lastSequenceTopic2; if ((lastSequence.get() == foo - 1)) { @@ -78,7 +87,7 @@ final void sequencing() throws Exception { withRunningFlusher( outbox, () -> { - for (int ix = 1; ix <= 100; ix++) { + for (int ix = 1; ix <= countPerTopic; ix++) { final int i = ix; transactionManager.inTransaction( () -> { @@ -95,8 +104,8 @@ final void sequencing() throws Exception { }); } assertTrue(latch.await(30, SECONDS)); - assertEquals(100, lastSequenceTopic1.get()); - assertEquals(100, lastSequenceTopic2.get()); + assertEquals(countPerTopic, lastSequenceTopic1.get()); + assertEquals(countPerTopic, lastSequenceTopic2.get()); }); } From 528aa45c1177318e8fc501a2a55e3e29a52d9dd7 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 17 Mar 2024 21:17:08 +0000 Subject: [PATCH 08/11] Revert ham-fisted statement caching = this is for the driver to do. --- .../transactionoutbox/DefaultPersistor.java | 48 +++++++++-------- .../QuarkusTransactionManager.java | 52 +++++++------------ .../quarkus/QuarkusTransactionManager.java | 52 +++++++------------ .../SpringTransactionManager.java | 50 +++++++----------- .../spring/SpringTransactionManager.java | 50 +++++++----------- 5 files changed, 104 insertions(+), 148 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index 49f4f628..c194e594 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -280,22 +280,24 @@ public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Excepti @Override public boolean unblock(Transaction tx, String entryId) throws Exception { - @SuppressWarnings("resource") - PreparedStatement stmt = - tx.prepareBatchStatement( - "UPDATE " - + tableName - + " SET attempts = 0, blocked = " - + dialect.booleanValue(false) - + " " - + "WHERE blocked = " - + dialect.booleanValue(true) - + " AND processed = " - + dialect.booleanValue(false) - + " AND id = ?"); - stmt.setString(1, entryId); - stmt.setQueryTimeout(writeLockTimeoutSeconds); - return stmt.executeUpdate() != 0; + //noinspection resource + try (PreparedStatement stmt = + tx.connection() + .prepareStatement( + "UPDATE " + + tableName + + " SET attempts = 0, blocked = " + + dialect.booleanValue(false) + + " " + + "WHERE blocked = " + + dialect.booleanValue(true) + + " AND processed = " + + dialect.booleanValue(false) + + " AND id = ?")) { + stmt.setString(1, entryId); + stmt.setQueryTimeout(writeLockTimeoutSeconds); + return stmt.executeUpdate() != 0; + } } @Override @@ -346,12 +348,14 @@ public List selectActiveTopics(Transaction tx) throws Exception { @Override public Optional nextInTopic(Transaction tx, String topic) throws Exception { - PreparedStatement stmt = - tx.prepareBatchStatement(dialect.fetchAndLockNextInTopic(ALL_FIELDS, tableName)); - stmt.setString(1, topic); - var results = new ArrayList(1); - gatherResults(stmt, results); - return results.stream().findFirst(); + //noinspection resource + try (PreparedStatement stmt = + tx.connection().prepareStatement(dialect.fetchAndLockNextInTopic(ALL_FIELDS, tableName))) { + stmt.setString(1, topic); + var results = new ArrayList(1); + gatherResults(stmt, results); + return results.stream().findFirst(); + } } @Override diff --git a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java index 2fbbd53f..f79356b1 100644 --- a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java +++ b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/QuarkusTransactionManager.java @@ -15,8 +15,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; import javax.sql.DataSource; /** @@ -69,8 +67,6 @@ public T requireTransactionReturns( private final class CdiTransaction implements Transaction { - private final Map preparedStatements = new HashMap<>(); - public Connection connection() { try { return datasource.getConnection(); @@ -81,34 +77,26 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - return preparedStatements.computeIfAbsent( - sql, - s -> - Utils.uncheckedly( - () -> { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> - BatchCountingStatementHandler.countBatches( - connection().prepareStatement(sql))); - - tsr.registerInterposedSynchronization( - new Synchronization() { - @Override - public void beforeCompletion() { - if (preparedStatement.getBatchCount() != 0) { - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - - return preparedStatement; - })); + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); + + tsr.registerInterposedSynchronization( + new Synchronization() { + @Override + public void beforeCompletion() { + if (preparedStatement.getBatchCount() != 0) { + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + + return preparedStatement; } @Override diff --git a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java index e3cec0e6..f93a2bee 100644 --- a/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java +++ b/transactionoutbox-quarkus/src/main/java/com/gruelbox/transactionoutbox/quarkus/QuarkusTransactionManager.java @@ -17,8 +17,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; import javax.sql.DataSource; /** Transaction manager which uses cdi and quarkus. */ @@ -68,8 +66,6 @@ public T requireTransactionReturns( private final class CdiTransaction implements Transaction { - private final Map preparedStatements = new HashMap<>(); - public Connection connection() { try { return datasource.getConnection(); @@ -80,34 +76,26 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - return preparedStatements.computeIfAbsent( - sql, - s -> - Utils.uncheckedly( - () -> { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> - BatchCountingStatementHandler.countBatches( - connection().prepareStatement(sql))); - - tsr.registerInterposedSynchronization( - new Synchronization() { - @Override - public void beforeCompletion() { - if (preparedStatement.getBatchCount() != 0) { - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - - return preparedStatement; - })); + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); + + tsr.registerInterposedSynchronization( + new Synchronization() { + @Override + public void beforeCompletion() { + if (preparedStatement.getBatchCount() != 0) { + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + + return preparedStatement; } @Override diff --git a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java index 14bd3a08..9685db7d 100644 --- a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java +++ b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/SpringTransactionManager.java @@ -9,8 +9,6 @@ import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.PreparedStatement; -import java.util.HashMap; -import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -83,8 +81,6 @@ public T requireTransactionReturns( private final class SpringTransaction implements Transaction { - private final Map preparedStatements = new HashMap<>(); - @Override public Connection connection() { return DataSourceUtils.getConnection(dataSource); @@ -92,33 +88,25 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - return preparedStatements.computeIfAbsent( - sql, - s -> - Utils.uncheckedly( - () -> { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> - BatchCountingStatementHandler.countBatches( - connection().prepareStatement(sql))); - TransactionSynchronizationManager.registerSynchronization( - new TransactionSynchronization() { - @Override - public void beforeCommit(boolean readOnly) { - if (preparedStatement.getBatchCount() != 0) { - log.debug("Flushing batches"); - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - return preparedStatement; - })); + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + @Override + public void beforeCommit(boolean readOnly) { + if (preparedStatement.getBatchCount() != 0) { + log.debug("Flushing batches"); + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + return preparedStatement; } @Override diff --git a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java index 738aad6a..51a0023f 100644 --- a/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java +++ b/transactionoutbox-spring/src/main/java/com/gruelbox/transactionoutbox/spring/SpringTransactionManager.java @@ -10,8 +10,6 @@ import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.PreparedStatement; -import java.util.HashMap; -import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -81,8 +79,6 @@ public T requireTransactionReturns( private final class SpringTransaction implements Transaction { - private final Map preparedStatements = new HashMap<>(); - @Override public Connection connection() { return DataSourceUtils.getConnection(dataSource); @@ -90,33 +86,25 @@ public Connection connection() { @Override public PreparedStatement prepareBatchStatement(String sql) { - return preparedStatements.computeIfAbsent( - sql, - s -> - Utils.uncheckedly( - () -> { - BatchCountingStatement preparedStatement = - Utils.uncheckedly( - () -> - BatchCountingStatementHandler.countBatches( - connection().prepareStatement(sql))); - TransactionSynchronizationManager.registerSynchronization( - new TransactionSynchronization() { - @Override - public void beforeCommit(boolean readOnly) { - if (preparedStatement.getBatchCount() != 0) { - log.debug("Flushing batches"); - Utils.uncheck(preparedStatement::executeBatch); - } - } - - @Override - public void afterCompletion(int status) { - Utils.safelyClose(preparedStatement); - } - }); - return preparedStatement; - })); + BatchCountingStatement preparedStatement = + Utils.uncheckedly( + () -> BatchCountingStatementHandler.countBatches(connection().prepareStatement(sql))); + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + @Override + public void beforeCommit(boolean readOnly) { + if (preparedStatement.getBatchCount() != 0) { + log.debug("Flushing batches"); + Utils.uncheck(preparedStatement::executeBatch); + } + } + + @Override + public void afterCompletion(int status) { + Utils.safelyClose(preparedStatement); + } + }); + return preparedStatement; } @Override From 3997d3ff732d2ee95c1214791d27df56030c4b60 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 18 Mar 2024 00:24:54 +0000 Subject: [PATCH 09/11] Tweak test so timeouts are more tolerant --- .../transactionoutbox/testing/AbstractAcceptanceTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index ab08a8ff..0cb64387 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -519,12 +519,12 @@ final void lastAttemptTime_updatesEveryTime() throws Exception { () -> { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); - assertTrue(blockLatch.await(10, SECONDS)); + assertTrue(blockLatch.await(20, SECONDS)); assertTrue( (Boolean) transactionManager.inTransactionReturns( tx -> outbox.unblock(orderedEntryListener.getBlocked().getId()))); - assertTrue(successLatch.await(10, SECONDS)); + assertTrue(successLatch.await(20, SECONDS)); var orderedEntryEvents = orderedEntryListener.getOrderedEntries(); log.info("The entry life cycle is: {}", orderedEntryEvents); @@ -573,12 +573,12 @@ final void blockAndThenUnblockForRetry() throws Exception { () -> { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); - assertTrue(blockLatch.await(3, SECONDS)); + assertTrue(blockLatch.await(5, SECONDS)); assertTrue( (Boolean) transactionManager.inTransactionReturns( tx -> outbox.unblock(latchListener.getBlocked().getId()))); - assertTrue(successLatch.await(3, SECONDS)); + assertTrue(successLatch.await(5, SECONDS)); }); } From 51d523fd55bde0aa81d9ec3f52500edaa7033d04 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 18 Mar 2024 16:59:16 +0000 Subject: [PATCH 10/11] Fix race conditions in tests --- .../testing/AbstractAcceptanceTest.java | 98 ++++++++++--------- .../transactionoutbox/testing/BaseTest.java | 8 +- .../testing/OrderedEntryListener.java | 44 ++++----- 3 files changed, 82 insertions(+), 68 deletions(-) diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 0cb64387..1eacda39 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -22,12 +22,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,48 +36,42 @@ public abstract class AbstractAcceptanceTest extends BaseTest { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptanceTest.class); - private final ExecutorService unreliablePool = - new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16)); + private ExecutorService unreliablePool; + private ExecutorService singleThreadPool; private static final Random random = new Random(); + @BeforeEach + void beforeEachBase() { + unreliablePool = + new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16)); + singleThreadPool = Executors.newSingleThreadExecutor(); + } + + @AfterEach + void afterEachBase() throws InterruptedException { + unreliablePool.shutdown(); + singleThreadPool.shutdown(); + assertTrue(unreliablePool.awaitTermination(30, SECONDS)); + assertTrue(singleThreadPool.awaitTermination(30, SECONDS)); + } + @Test final void sequencing() throws Exception { int countPerTopic = 50; - AtomicInteger lastSequenceTopic1 = new AtomicInteger(); - AtomicInteger lastSequenceTopic2 = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(countPerTopic * 2); - TransactionManager transactionManager = txManager(); + OrderedEntryListener orderedEntryListener = + new OrderedEntryListener(latch, new CountDownLatch(1)); TransactionOutbox outbox = TransactionOutbox.builder() .transactionManager(transactionManager) .submitter(Submitter.withExecutor(unreliablePool)) .attemptFrequency(Duration.ofMillis(500)) - .instantiator( - Instantiator.using( - clazz -> - (InterfaceProcessor) - (foo, bar) -> { - if (random.nextInt(10) == 5) { - throw new RuntimeException( - "Temporary failure of InterfaceProcessor"); - } - var lastSequence = - ("topic1".equals(bar)) ? lastSequenceTopic1 : lastSequenceTopic2; - if ((lastSequence.get() == foo - 1)) { - lastSequence.set(foo); - latch.countDown(); - } else { - latch.countDown(); - throw new IllegalStateException( - String.format( - "Ordering violated (previous=%s, seq=%s", - lastSequence.get(), foo)); - } - })) + .instantiator(new RandomFailingInstantiator()) .persistor(persistor()) + .listener(orderedEntryListener) .initializeImmediately(false) .build(); @@ -104,8 +98,19 @@ final void sequencing() throws Exception { }); } assertTrue(latch.await(30, SECONDS)); - assertEquals(countPerTopic, lastSequenceTopic1.get()); - assertEquals(countPerTopic, lastSequenceTopic2.get()); + var indexes = LongStream.range(1, countPerTopic + 1).boxed().collect(Collectors.toList()); + assertEquals( + indexes, + orderedEntryListener.getSuccesses().stream() + .filter(it -> "topic1".equals(it.getTopic())) + .map(TransactionOutboxEntry::getSequence) + .collect(Collectors.toList())); + assertEquals( + indexes, + orderedEntryListener.getSuccesses().stream() + .filter(it -> "topic2".equals(it.getTopic())) + .map(TransactionOutboxEntry::getSequence) + .collect(Collectors.toList())); }); } @@ -435,7 +440,7 @@ final void retryBehaviour() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) - .submitter(Submitter.withExecutor(unreliablePool)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(new LatchListener(latch)) .build(); @@ -448,7 +453,8 @@ final void retryBehaviour() throws Exception { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); assertTrue(latch.await(15, SECONDS)); - }); + }, + singleThreadPool); } @Test @@ -506,7 +512,7 @@ final void lastAttemptTime_updatesEveryTime() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) - .submitter(Submitter.withExecutor(unreliablePool)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(orderedEntryListener) .blockAfterAttempts(2) @@ -519,30 +525,30 @@ final void lastAttemptTime_updatesEveryTime() throws Exception { () -> { transactionManager.inTransaction( () -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee")); - assertTrue(blockLatch.await(20, SECONDS)); + assertTrue(blockLatch.await(20, SECONDS), "Entry was not blocked"); assertTrue( (Boolean) transactionManager.inTransactionReturns( tx -> outbox.unblock(orderedEntryListener.getBlocked().getId()))); - assertTrue(successLatch.await(20, SECONDS)); - var orderedEntryEvents = orderedEntryListener.getOrderedEntries(); - log.info("The entry life cycle is: {}", orderedEntryEvents); + assertTrue(successLatch.await(20, SECONDS), "Timeout waiting for success"); + var events = orderedEntryListener.getEvents(); + log.info("The entry life cycle is: {}", events); // then we are only dealing in terms of a single outbox entry. - assertEquals( - 1, orderedEntryEvents.stream().map(TransactionOutboxEntry::getId).distinct().count()); + assertEquals(1, events.stream().map(TransactionOutboxEntry::getId).distinct().count()); // the first, scheduled entry has no lastAttemptTime set - assertNull(orderedEntryEvents.get(0).getLastAttemptTime()); + assertNull(events.get(0).getLastAttemptTime()); // all subsequent entries (2 x failures (second of which 'blocks'), 1x success updates // against db) have a distinct lastAttemptTime set on them. assertEquals( 3, - orderedEntryEvents.stream() + events.stream() .skip(1) .map(TransactionOutboxEntry::getLastAttemptTime) .distinct() .count()); - }); + }, + singleThreadPool); } /** @@ -561,6 +567,7 @@ final void blockAndThenUnblockForRetry() throws Exception { .transactionManager(transactionManager) .persistor(Persistor.forDialect(connectionDetails().dialect())) .instantiator(new FailingInstantiator(attempts)) + .submitter(Submitter.withExecutor(singleThreadPool)) .attemptFrequency(Duration.ofMillis(500)) .listener(latchListener) .blockAfterAttempts(2) @@ -579,7 +586,8 @@ final void blockAndThenUnblockForRetry() throws Exception { transactionManager.inTransactionReturns( tx -> outbox.unblock(latchListener.getBlocked().getId()))); assertTrue(successLatch.await(5, SECONDS)); - }); + }, + singleThreadPool); } /** Hammers high-volume, frequently failing tasks to ensure that they all get run. */ diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java index 4145c4f2..617c14a3 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/BaseTest.java @@ -4,6 +4,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.SQLException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -73,6 +74,11 @@ protected void clearOutbox() { protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable runnable) throws Exception { + withRunningFlusher(outbox, runnable, flushExecutor); + } + + protected void withRunningFlusher( + TransactionOutbox outbox, ThrowingRunnable runnable, Executor executor) throws Exception { Thread backgroundThread = new Thread( () -> { @@ -80,7 +86,7 @@ protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable run try { // Keep flushing work until there's nothing left to flush //noinspection StatementWithEmptyBody - while (outbox.flush(flushExecutor)) {} + while (outbox.flush(executor)) {} } catch (Exception e) { log.error("Error flushing transaction outbox", e); } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java index e91fcf56..db6245bc 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java @@ -6,39 +6,46 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * Collects an ordered list of all entry events (*excluding blocked events) that have hit this * listener until a specified number of blocks / successes have occurred. */ +@Slf4j public final class OrderedEntryListener implements TransactionOutboxListener { private final CountDownLatch successLatch; private final CountDownLatch blockedLatch; @Getter private volatile TransactionOutboxEntry blocked; - private final CopyOnWriteArrayList orderedEntries; + private final CopyOnWriteArrayList events = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList successes = + new CopyOnWriteArrayList<>(); OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) { this.successLatch = successLatch; this.blockedLatch = blockedLatch; - orderedEntries = new CopyOnWriteArrayList<>(); } @Override public void scheduled(TransactionOutboxEntry entry) { - orderedEntries.add(from(entry)); + events.add(from(entry)); } @Override public void success(TransactionOutboxEntry entry) { - orderedEntries.add(from(entry)); + var copy = from(entry); + events.add(copy); + successes.add(copy); + log.info( + "Received success #{}. Counting down at {}", successes.size(), successLatch.getCount()); successLatch.countDown(); } @Override public void failure(TransactionOutboxEntry entry, Throwable cause) { - orderedEntries.add(from(entry)); + events.add(from(entry)); } @Override @@ -50,28 +57,21 @@ public void blocked(TransactionOutboxEntry entry, Throwable cause) { } /** - * Retrieve an unmodifiable copy of {@link #orderedEntries}. Beware, expectation is that this does - * not/ should not get accessed until the correct number of {@link - * #success(TransactionOutboxEntry)} and {@link #blocked(TransactionOutboxEntry, Throwable)}} - * counts have occurred. + * Retrieve an unmodifiable copy of {@link #events}. Beware, expectation is that this does not/ + * should not get accessed until the correct number of {@link #success(TransactionOutboxEntry)} + * and {@link #blocked(TransactionOutboxEntry, Throwable)}} counts have occurred. * * @return unmodifiable list of ordered outbox entry events. */ - public List getOrderedEntries() { - return List.copyOf(orderedEntries); + public List getEvents() { + return List.copyOf(events); + } + + public List getSuccesses() { + return List.copyOf(successes); } private TransactionOutboxEntry from(TransactionOutboxEntry entry) { - return TransactionOutboxEntry.builder() - .id(entry.getId()) - .uniqueRequestId(entry.getUniqueRequestId()) - .invocation(entry.getInvocation()) - .lastAttemptTime(entry.getLastAttemptTime()) - .nextAttemptTime(entry.getNextAttemptTime()) - .attempts(entry.getAttempts()) - .blocked(entry.isBlocked()) - .processed(entry.isProcessed()) - .version(entry.getVersion()) - .build(); + return entry.toBuilder().build(); } } From d983747b7ee74a05a65946cdf24aef590ef06aa7 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Mon, 18 Mar 2024 23:16:02 +0000 Subject: [PATCH 11/11] Small tidy-ups --- .../gruelbox/transactionoutbox/Persistor.java | 15 +++ .../TransactionOutboxImpl.java | 104 ++++++------------ .../TransactionOutboxListener.java | 2 + 3 files changed, 52 insertions(+), 69 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java index 1590911c..bb54baa9 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java @@ -106,8 +106,23 @@ static DefaultPersistor forDialect(Dialect dialect) { List selectBatch(Transaction tx, int batchSize, Instant now) throws Exception; + /** + * Selects the list of topics with work awaiting processing. + * + * @param tx The current {@link Transaction}. + * @return The topics. + * @throws Exception Any exception. + */ List selectActiveTopics(final Transaction tx) throws Exception; + /** + * Fetches and locks the next available piece of work on the specified topic. + * + * @param tx The current {@link Transaction}. + * @param topic The topic. + * @return The next available piece of work on the selected topic. + * @throws Exception ANy exception. + */ Optional nextInTopic(Transaction tx, String topic) throws Exception; /** diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 482f7058..10436da5 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -9,21 +9,26 @@ import com.gruelbox.transactionoutbox.spi.Utils; import java.lang.reflect.InvocationTargetException; import java.time.*; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.slf4j.event.Level; @Slf4j -class TransactionOutboxImpl implements TransactionOutbox, Validatable { - - private static final int DEFAULT_FLUSH_BATCH_SIZE = 4096; +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +final class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final TransactionManager transactionManager; private final Persistor persistor; @@ -41,39 +46,6 @@ class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final AtomicBoolean initialized = new AtomicBoolean(); private final ProxyFactory proxyFactory = new ProxyFactory(); - private TransactionOutboxImpl( - TransactionManager transactionManager, - Instantiator instantiator, - Submitter submitter, - Duration attemptFrequency, - int blockAfterAttempts, - int flushBatchSize, - Supplier clockProvider, - TransactionOutboxListener listener, - Persistor persistor, - Level logLevelTemporaryFailure, - Boolean serializeMdc, - Duration retentionThreshold, - Boolean initializeImmediately) { - this.transactionManager = transactionManager; - this.instantiator = Utils.firstNonNull(instantiator, Instantiator::usingReflection); - this.persistor = persistor; - this.submitter = Utils.firstNonNull(submitter, Submitter::withDefaultExecutor); - this.attemptFrequency = Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)); - this.blockAfterAttempts = blockAfterAttempts < 1 ? 5 : blockAfterAttempts; - this.flushBatchSize = flushBatchSize < 1 ? DEFAULT_FLUSH_BATCH_SIZE : flushBatchSize; - this.clockProvider = clockProvider == null ? Clock::systemDefaultZone : clockProvider; - this.listener = Utils.firstNonNull(listener, () -> new TransactionOutboxListener() {}); - this.logLevelTemporaryFailure = Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN); - this.validator = new Validator(this.clockProvider); - this.serializeMdc = serializeMdc == null || serializeMdc; - this.retentionThreshold = retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold; - this.validator.validate(this); - if (initializeImmediately == null || initializeImmediately) { - initialize(); - } - } - @Override public void validate(Validator validator) { validator.notNull("transactionManager", transactionManager); @@ -424,10 +396,7 @@ private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) { entry.setAttempts(entry.getAttempts() + 1); var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts); entry.setBlocked(blocked); - entry.setLastAttemptTime(Instant.now(clockProvider.get())); - entry.setNextAttemptTime(after(attemptFrequency)); - validator.validate(entry); - transactionManager.inTransactionThrows(transaction -> persistor.update(transaction, entry)); + transactionManager.inTransactionThrows(tx -> pushBack(tx, entry)); listener.failure(entry, cause); if (blocked) { log.error( @@ -462,46 +431,43 @@ static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder { } public TransactionOutboxImpl build() { - return new TransactionOutboxImpl( - transactionManager, - instantiator, - submitter, - attemptFrequency, - blockAfterAttempts, - flushBatchSize, - clockProvider, - listener, - persistor, - logLevelTemporaryFailure, - serializeMdc, - retentionThreshold, - initializeImmediately); + Validator validator = new Validator(this.clockProvider); + TransactionOutboxImpl impl = + new TransactionOutboxImpl( + transactionManager, + persistor, + Utils.firstNonNull(instantiator, Instantiator::usingReflection), + Utils.firstNonNull(submitter, Submitter::withDefaultExecutor), + Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)), + Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN), + blockAfterAttempts < 1 ? 5 : blockAfterAttempts, + flushBatchSize < 1 ? 4096 : flushBatchSize, + clockProvider == null ? Clock::systemDefaultZone : clockProvider, + Utils.firstNonNull(listener, () -> TransactionOutboxListener.EMPTY), + serializeMdc == null || serializeMdc, + validator, + retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold); + validator.validate(impl); + if (initializeImmediately == null || initializeImmediately) { + impl.initialize(); + } + return impl; } } + @Accessors(fluent = true, chain = true) + @Setter private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder { private String uniqueRequestId; - private String topic; - - @Override - public ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId) { - this.uniqueRequestId = uniqueRequestId; - return this; - } - - @Override - public ParameterizedScheduleBuilder ordered(String topic) { - this.topic = topic; - return this; - } + private String ordered; @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, topic); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered); } } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java index d7c9f2f4..c0240cb8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxListener.java @@ -5,6 +5,8 @@ /** A listener for events fired by {@link TransactionOutbox}. */ public interface TransactionOutboxListener { + TransactionOutboxListener EMPTY = new TransactionOutboxListener() {}; + /** * Fired when a transaction outbox task is scheduled. *