diff --git a/transactionoutbox-acceptance/src/test/java/com/gruelbox/transactionoutbox/acceptance/TestH2.java b/transactionoutbox-acceptance/src/test/java/com/gruelbox/transactionoutbox/acceptance/TestH2.java index dccc783b..b40e8ef1 100644 --- a/transactionoutbox-acceptance/src/test/java/com/gruelbox/transactionoutbox/acceptance/TestH2.java +++ b/transactionoutbox-acceptance/src/test/java/com/gruelbox/transactionoutbox/acceptance/TestH2.java @@ -1,12 +1,16 @@ package com.gruelbox.transactionoutbox.acceptance; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.gruelbox.transactionoutbox.*; import com.gruelbox.transactionoutbox.testing.AbstractAcceptanceTest; import com.gruelbox.transactionoutbox.testing.InterfaceProcessor; import com.gruelbox.transactionoutbox.testing.LatchListener; +import com.gruelbox.transactionoutbox.testing.OrderedEntryListener; import java.lang.reflect.InvocationTargetException; +import java.time.Duration; +import java.time.Instant; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -17,6 +21,64 @@ class TestH2 extends AbstractAcceptanceTest { static final ThreadLocal inWrappedInvocation = ThreadLocal.withInitial(() -> false); + @Test + final void delayedExecutionImmediateSubmission() throws InterruptedException { + + CountDownLatch latch = new CountDownLatch(1); + TransactionManager transactionManager = txManager(); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {})) + .listener(new OrderedEntryListener(latch, new CountDownLatch(1))) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .attemptFrequency(Duration.ofSeconds(60)) + .build(); + + outbox.initialize(); + clearOutbox(); + + var start = Instant.now(); + transactionManager.inTransaction( + () -> + outbox + .with() + .delayForAtLeast(Duration.ofSeconds(1)) + .schedule(InterfaceProcessor.class) + .process(1, "bar")); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(start.plus(Duration.ofSeconds(1)).isBefore(Instant.now())); + } + + @Test + final void delayedExecutionFlushOnly() throws Exception { + + CountDownLatch latch = new CountDownLatch(1); + TransactionManager transactionManager = txManager(); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {})) + .listener(new OrderedEntryListener(latch, new CountDownLatch(1))) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .attemptFrequency(Duration.ofSeconds(1)) + .build(); + + outbox.initialize(); + clearOutbox(); + + transactionManager.inTransaction( + () -> + outbox + .with() + .delayForAtLeast(Duration.ofSeconds(2)) + .schedule(InterfaceProcessor.class) + .process(1, "bar")); + assertFalse(latch.await(3, TimeUnit.SECONDS)); + + withRunningFlusher(outbox, () -> assertTrue(latch.await(3, TimeUnit.SECONDS))); + } + @Test final void wrapInvocations() throws InterruptedException { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 59bcdf67..51e0ab59 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -368,6 +368,32 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder ordered(String topic); + /** + * Instructs the scheduler to delay processing the task until after the specified duration. This + * can be used for simple job scheduling or to introduce an asynchronous delay into chains of + * tasks. + * + *

Note that any delay is not precise and accuracy is primarily determined by the + * frequency at which {@link #flush(Executor)} or {@link #flush()} are called. Do not use this + * for time-sensitive tasks, particularly if the duration exceeds {@link + * TransactionOutboxBuilder#attemptFrequency(Duration)} (see more on this below). + * + *

A note on implementation: tasks (when {@link #ordered(String)} is not used) are normally + * submitted for processing on the local JVM immediately after transaction commit. By default, + * when a delay is introduced, the work is instead submitted to a {@link + * java.util.concurrent.ScheduledExecutorService} for processing after the specified delay. + * However, if the delay is long enough that the work would likely get picked up by a {@link + * #flush()} on this JVM or another, this is pointless and wasteful. Unfortunately, we don't + * know exactly how frequently {@link #flush()} will be called! To mitigate this, Any task + * submitted with a delay in excess of {@link + * TransactionOutboxBuilder#attemptFrequency(Duration)} will be assumed to get picked up by a + * future flush. + * + * @param duration The minimum delay duration. + * @return Builder. + */ + ParameterizedScheduleBuilder delayForAtLeast(Duration duration); + /** * Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters * to the request as configured using {@link TransactionOutbox#with()}. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 10436da5..b9e1c981 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -12,8 +12,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -45,6 +44,7 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final Duration retentionThreshold; private final AtomicBoolean initialized = new AtomicBoolean(); private final ProxyFactory proxyFactory = new ProxyFactory(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); @Override public void validate(Validator validator) { @@ -79,7 +79,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null, null); + return schedule(clazz, null, null, null); } @Override @@ -208,7 +208,8 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule(Class clazz, String uniqueRequestId, String topic) { + private T schedule( + Class clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -226,6 +227,9 @@ private T schedule(Class clazz, String uniqueRequestId, String topic) { extracted.getArgs(), uniqueRequestId, topic); + if (delayForAtLeast != null) { + entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast)); + } validator.validate(entry); persistor.save(extracted.getTransaction(), entry); extracted @@ -233,12 +237,28 @@ private T schedule(Class clazz, String uniqueRequestId, String topic) { .addPostCommitHook( () -> { listener.scheduled(entry); - if (entry.getTopic() == null) { + if (entry.getTopic() != null) { + log.debug("Queued {} in topic {}", entry.description(), topic); + } else if (delayForAtLeast == null) { submitNow(entry); + log.debug( + "Scheduled {} for post-commit execution", entry.description()); + } else if (delayForAtLeast.compareTo(attemptFrequency) < 0) { + scheduler.schedule( + () -> submitNow(entry), + delayForAtLeast.toMillis(), + TimeUnit.MILLISECONDS); + log.info( + "Scheduled {} for post-commit execution after at least {}", + entry.description(), + delayForAtLeast); + } else { + log.info( + "Queued {} for execution after at least {}", + entry.description(), + delayForAtLeast); } }); - log.debug( - "Scheduled {} for running after transaction commit", entry.description()); return null; })); } @@ -461,13 +481,14 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB private String uniqueRequestId; private String ordered; + private Duration delayForAtLeast; @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered); + return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast); } } } diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java index db6245bc..b80a0111 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/OrderedEntryListener.java @@ -23,7 +23,7 @@ public final class OrderedEntryListener implements TransactionOutboxListener { private final CopyOnWriteArrayList successes = new CopyOnWriteArrayList<>(); - OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) { + public OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) { this.successLatch = successLatch; this.blockedLatch = blockedLatch; }