Skip to content

Commit

Permalink
Merge pull request #595 from gruelbox/scheduled-execution
Browse files Browse the repository at this point in the history
Allow for execution to be delayed
  • Loading branch information
badgerwithagun authored Mar 20, 2024
2 parents 94bb2ad + 0fae04b commit 58cf651
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.gruelbox.transactionoutbox.acceptance;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.gruelbox.transactionoutbox.*;
import com.gruelbox.transactionoutbox.testing.AbstractAcceptanceTest;
import com.gruelbox.transactionoutbox.testing.InterfaceProcessor;
import com.gruelbox.transactionoutbox.testing.LatchListener;
import com.gruelbox.transactionoutbox.testing.OrderedEntryListener;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
Expand All @@ -17,6 +21,64 @@ class TestH2 extends AbstractAcceptanceTest {

static final ThreadLocal<Boolean> inWrappedInvocation = ThreadLocal.withInitial(() -> false);

@Test
final void delayedExecutionImmediateSubmission() throws InterruptedException {

CountDownLatch latch = new CountDownLatch(1);
TransactionManager transactionManager = txManager();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.listener(new OrderedEntryListener(latch, new CountDownLatch(1)))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.attemptFrequency(Duration.ofSeconds(60))
.build();

outbox.initialize();
clearOutbox();

var start = Instant.now();
transactionManager.inTransaction(
() ->
outbox
.with()
.delayForAtLeast(Duration.ofSeconds(1))
.schedule(InterfaceProcessor.class)
.process(1, "bar"));
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(start.plus(Duration.ofSeconds(1)).isBefore(Instant.now()));
}

@Test
final void delayedExecutionFlushOnly() throws Exception {

CountDownLatch latch = new CountDownLatch(1);
TransactionManager transactionManager = txManager();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.listener(new OrderedEntryListener(latch, new CountDownLatch(1)))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.attemptFrequency(Duration.ofSeconds(1))
.build();

outbox.initialize();
clearOutbox();

transactionManager.inTransaction(
() ->
outbox
.with()
.delayForAtLeast(Duration.ofSeconds(2))
.schedule(InterfaceProcessor.class)
.process(1, "bar"));
assertFalse(latch.await(3, TimeUnit.SECONDS));

withRunningFlusher(outbox, () -> assertTrue(latch.await(3, TimeUnit.SECONDS)));
}

@Test
final void wrapInvocations() throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,32 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder ordered(String topic);

/**
* Instructs the scheduler to delay processing the task until after the specified duration. This
* can be used for simple job scheduling or to introduce an asynchronous delay into chains of
* tasks.
*
* <p>Note that any delay is <em>not precise</em> and accuracy is primarily determined by the
* frequency at which {@link #flush(Executor)} or {@link #flush()} are called. Do not use this
* for time-sensitive tasks, particularly if the duration exceeds {@link
* TransactionOutboxBuilder#attemptFrequency(Duration)} (see more on this below).
*
* <p>A note on implementation: tasks (when {@link #ordered(String)} is not used) are normally
* submitted for processing on the local JVM immediately after transaction commit. By default,
* when a delay is introduced, the work is instead submitted to a {@link
* java.util.concurrent.ScheduledExecutorService} for processing after the specified delay.
* However, if the delay is long enough that the work would likely get picked up by a {@link
* #flush()} on this JVM or another, this is pointless and wasteful. Unfortunately, we don't
* know exactly how frequently {@link #flush()} will be called! To mitigate this, Any task
* submitted with a delay in excess of {@link
* TransactionOutboxBuilder#attemptFrequency(Duration)} will be assumed to get picked up by a
* future flush.
*
* @param duration The minimum delay duration.
* @return Builder.
*/
ParameterizedScheduleBuilder delayForAtLeast(Duration duration);

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -45,6 +44,7 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final Duration retentionThreshold;
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

@Override
public void validate(Validator validator) {
Expand Down Expand Up @@ -79,7 +79,7 @@ public void initialize() {

@Override
public <T> T schedule(Class<T> clazz) {
return schedule(clazz, null, null);
return schedule(clazz, null, null, null);
}

@Override
Expand Down Expand Up @@ -208,7 +208,8 @@ public boolean unblock(String entryId, Object transactionContext) {
}
}

private <T> T schedule(Class<T> clazz, String uniqueRequestId, String topic) {
private <T> T schedule(
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -226,19 +227,38 @@ private <T> T schedule(Class<T> clazz, String uniqueRequestId, String topic) {
extracted.getArgs(),
uniqueRequestId,
topic);
if (delayForAtLeast != null) {
entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast));
}
validator.validate(entry);
persistor.save(extracted.getTransaction(), entry);
extracted
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
if (entry.getTopic() == null) {
if (entry.getTopic() != null) {
log.debug("Queued {} in topic {}", entry.description(), topic);
} else if (delayForAtLeast == null) {
submitNow(entry);
log.debug(
"Scheduled {} for post-commit execution", entry.description());
} else if (delayForAtLeast.compareTo(attemptFrequency) < 0) {
scheduler.schedule(
() -> submitNow(entry),
delayForAtLeast.toMillis(),
TimeUnit.MILLISECONDS);
log.info(
"Scheduled {} for post-commit execution after at least {}",
entry.description(),
delayForAtLeast);
} else {
log.info(
"Queued {} for execution after at least {}",
entry.description(),
delayForAtLeast);
}
});
log.debug(
"Scheduled {} for running after transaction commit", entry.description());
return null;
}));
}
Expand Down Expand Up @@ -461,13 +481,14 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB

private String uniqueRequestId;
private String ordered;
private Duration delayForAtLeast;

@Override
public <T> T schedule(Class<T> clazz) {
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public final class OrderedEntryListener implements TransactionOutboxListener {
private final CopyOnWriteArrayList<TransactionOutboxEntry> successes =
new CopyOnWriteArrayList<>();

OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) {
public OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) {
this.successLatch = successLatch;
this.blockedLatch = blockedLatch;
}
Expand Down

0 comments on commit 58cf651

Please sign in to comment.