Skip to content

Commit

Permalink
Merge branch 'master' into reuse-containers
Browse files Browse the repository at this point in the history
  • Loading branch information
badgerwithagun authored Mar 21, 2024
2 parents b4f2e9f + be14414 commit ea3077f
Show file tree
Hide file tree
Showing 29 changed files with 159 additions and 539 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Build, publish to GPR and tag
run: |
if [ "$GITHUB_REPOSITORY" == "gruelbox/transaction-outbox" ]; then
revision="5.5.$GITHUB_RUN_NUMBER"
revision="6.0.$GITHUB_RUN_NUMBER"
echo "Building $revision at $GITHUB_SHA"
mvn -Pconcise,delombok -B deploy -s $GITHUB_WORKSPACE/settings.xml -Drevision="$revision" -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
echo "Tagging $revision"
Expand Down
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ A flexible implementation of the [Transaction Outbox Pattern](https://microservi
1. [Topics and FIFO ordering](#topics-and-fifo-ordering)
1. [The nested outbox pattern](#the-nested-outbox-pattern)
1. [Idempotency protection](#idempotency-protection)
1. [Delayed/scheduled processing](#delayedscheduled-processing)
1. [Flexible serialization](#flexible-serialization-beta)
1. [Clustering](#clustering)
1. [Configuration reference](#configuration-reference)
Expand Down Expand Up @@ -124,14 +125,14 @@ The latest stable release is available from Maven Central. Stable releases are [
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-core</artifactId>
<version>5.4.421</version>
<version>5.5.447</version>
</dependency>
```

#### Gradle

```groovy
implementation 'com.gruelbox:transactionoutbox-core:5.4.421'
implementation 'com.gruelbox:transactionoutbox-core:5.5.447'
```

### Development snapshots
Expand Down Expand Up @@ -397,6 +398,21 @@ outbox.with()
Where `context-clientid` is a globally-unique identifier derived from the incoming request. Such ids are usually available from queue middleware as message ids, or if not you can require as part of the incoming API (possibly with a tenant prefix to ensure global uniqueness across tenants).
### Delayed/scheduled processing ###
To delay execution of a task, use:
```java
outbox.with()
.delayForAtLeast(Duration.of(5, MINUTES))
.schedule(Service.class)
.process("Foo");
```
There are some caveats around how accurate timing is. See the JavaDoc on the `delayForAtLeast` method for more information.
This is particularly useful when combined with the [nested outbox pattern](#the-nested-outbox-pattern) for creating polling/repeated or recursive tasks to throttle prcessing.
### Flexible serialization (beta)
Most people will use the default persistor, `DefaultPersistor`, to persist tasks to a relational database. This uses `DefaultInvocationSerializer` by default, which in turn uses [GSON](https://github.com/google/gson) to serialize as JSON. `DefaultInvocationSerializer` is extremely limited by design, with a small list of allowed classes in method arguments.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<!-- Dependency versions -->
<logback.version>1.5.3</logback.version>
<revision>1.3.99999-SNAPSHOT</revision>
<junit.jupiter.version>5.10.1</junit.jupiter.version>
<junit.jupiter.version>5.10.2</junit.jupiter.version>
<testcontainers.version>1.19.7</testcontainers.version>
<maven.source.plugin.version>3.3.0</maven.source.plugin.version>
<nexus.staging.plugin.version>1.6.13</nexus.staging.plugin.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.gruelbox.transactionoutbox.acceptance;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.gruelbox.transactionoutbox.*;
import com.gruelbox.transactionoutbox.testing.AbstractAcceptanceTest;
import com.gruelbox.transactionoutbox.testing.InterfaceProcessor;
import com.gruelbox.transactionoutbox.testing.LatchListener;
import com.gruelbox.transactionoutbox.testing.OrderedEntryListener;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
Expand All @@ -17,6 +21,64 @@ class TestH2 extends AbstractAcceptanceTest {

static final ThreadLocal<Boolean> inWrappedInvocation = ThreadLocal.withInitial(() -> false);

@Test
final void delayedExecutionImmediateSubmission() throws InterruptedException {

CountDownLatch latch = new CountDownLatch(1);
TransactionManager transactionManager = txManager();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.listener(new OrderedEntryListener(latch, new CountDownLatch(1)))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.attemptFrequency(Duration.ofSeconds(60))
.build();

outbox.initialize();
clearOutbox();

var start = Instant.now();
transactionManager.inTransaction(
() ->
outbox
.with()
.delayForAtLeast(Duration.ofSeconds(1))
.schedule(InterfaceProcessor.class)
.process(1, "bar"));
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(start.plus(Duration.ofSeconds(1)).isBefore(Instant.now()));
}

@Test
final void delayedExecutionFlushOnly() throws Exception {

CountDownLatch latch = new CountDownLatch(1);
TransactionManager transactionManager = txManager();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.instantiator(Instantiator.using(clazz -> (InterfaceProcessor) (foo, bar) -> {}))
.listener(new OrderedEntryListener(latch, new CountDownLatch(1)))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.attemptFrequency(Duration.ofSeconds(1))
.build();

outbox.initialize();
clearOutbox();

transactionManager.inTransaction(
() ->
outbox
.with()
.delayForAtLeast(Duration.ofSeconds(2))
.schedule(InterfaceProcessor.class)
.process(1, "bar"));
assertFalse(latch.await(3, TimeUnit.SECONDS));

withRunningFlusher(outbox, () -> assertTrue(latch.await(3, TimeUnit.SECONDS)));
}

@Test
final void wrapInvocations() throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,32 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder ordered(String topic);

/**
* Instructs the scheduler to delay processing the task until after the specified duration. This
* can be used for simple job scheduling or to introduce an asynchronous delay into chains of
* tasks.
*
* <p>Note that any delay is <em>not precise</em> and accuracy is primarily determined by the
* frequency at which {@link #flush(Executor)} or {@link #flush()} are called. Do not use this
* for time-sensitive tasks, particularly if the duration exceeds {@link
* TransactionOutboxBuilder#attemptFrequency(Duration)} (see more on this below).
*
* <p>A note on implementation: tasks (when {@link #ordered(String)} is not used) are normally
* submitted for processing on the local JVM immediately after transaction commit. By default,
* when a delay is introduced, the work is instead submitted to a {@link
* java.util.concurrent.ScheduledExecutorService} for processing after the specified delay.
* However, if the delay is long enough that the work would likely get picked up by a {@link
* #flush()} on this JVM or another, this is pointless and wasteful. Unfortunately, we don't
* know exactly how frequently {@link #flush()} will be called! To mitigate this, Any task
* submitted with a delay in excess of {@link
* TransactionOutboxBuilder#attemptFrequency(Duration)} will be assumed to get picked up by a
* future flush.
*
* @param duration The minimum delay duration.
* @return Builder.
*/
ParameterizedScheduleBuilder delayForAtLeast(Duration duration);

/**
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
* to the request as configured using {@link TransactionOutbox#with()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -45,6 +44,7 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final Duration retentionThreshold;
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

@Override
public void validate(Validator validator) {
Expand Down Expand Up @@ -79,7 +79,7 @@ public void initialize() {

@Override
public <T> T schedule(Class<T> clazz) {
return schedule(clazz, null, null);
return schedule(clazz, null, null, null);
}

@Override
Expand Down Expand Up @@ -208,7 +208,8 @@ public boolean unblock(String entryId, Object transactionContext) {
}
}

private <T> T schedule(Class<T> clazz, String uniqueRequestId, String topic) {
private <T> T schedule(
Class<T> clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) {
if (!initialized.get()) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -226,19 +227,38 @@ private <T> T schedule(Class<T> clazz, String uniqueRequestId, String topic) {
extracted.getArgs(),
uniqueRequestId,
topic);
if (delayForAtLeast != null) {
entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast));
}
validator.validate(entry);
persistor.save(extracted.getTransaction(), entry);
extracted
.getTransaction()
.addPostCommitHook(
() -> {
listener.scheduled(entry);
if (entry.getTopic() == null) {
if (entry.getTopic() != null) {
log.debug("Queued {} in topic {}", entry.description(), topic);
} else if (delayForAtLeast == null) {
submitNow(entry);
log.debug(
"Scheduled {} for post-commit execution", entry.description());
} else if (delayForAtLeast.compareTo(attemptFrequency) < 0) {
scheduler.schedule(
() -> submitNow(entry),
delayForAtLeast.toMillis(),
TimeUnit.MILLISECONDS);
log.info(
"Scheduled {} for post-commit execution after at least {}",
entry.description(),
delayForAtLeast);
} else {
log.info(
"Queued {} for execution after at least {}",
entry.description(),
delayForAtLeast);
}
});
log.debug(
"Scheduled {} for running after transaction commit", entry.description());
return null;
}));
}
Expand Down Expand Up @@ -461,13 +481,14 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB

private String uniqueRequestId;
private String ordered;
private Duration delayForAtLeast;

@Override
public <T> T schedule(Class<T> clazz) {
if (uniqueRequestId != null && uniqueRequestId.length() > 250) {
throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
}
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered);
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
}
}
}
4 changes: 2 additions & 2 deletions transactionoutbox-guice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ Extension for [transaction-outbox-core](../README.md) which integrates with Guic
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-guice</artifactId>
<version>5.4.421</version>
<version>5.5.447</version>
</dependency>
```

#### Gradle

```groovy
implementation 'com.gruelbox:transactionoutbox-guice:5.4.421'
implementation 'com.gruelbox:transactionoutbox-guice:5.5.447'
```

### Development snapshots
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions transactionoutbox-jackson/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ which is why it is not included in the core library.
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-jackson</artifactId>
<version>5.4.421</version>
<version>5.5.447</version>
</dependency>
```

#### Gradle

```groovy
implementation 'com.gruelbox:transactionoutbox-jackson:5.4.421'
implementation 'com.gruelbox:transactionoutbox-jackson:5.5.447'
```

### Development snapshots
Expand Down
4 changes: 2 additions & 2 deletions transactionoutbox-jooq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ jOOQ gives you the option to either use thread-local transaction management or e
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-jooq</artifactId>
<version>5.4.421</version>
<version>5.5.447</version>
</dependency>
```

#### Gradle

```groovy
implementation 'com.gruelbox:transactionoutbox-jooq:5.4.421'
implementation 'com.gruelbox:transactionoutbox-jooq:5.5.447'
```

### Development snapshots
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit ea3077f

Please sign in to comment.