Skip to content

Commit

Permalink
Fix race conditions in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
badgerwithagun committed Mar 18, 2024
1 parent 3997d3f commit 51d523f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,48 +36,42 @@ public abstract class AbstractAcceptanceTest extends BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptanceTest.class);

private final ExecutorService unreliablePool =
new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16));
private ExecutorService unreliablePool;
private ExecutorService singleThreadPool;

private static final Random random = new Random();

@BeforeEach
void beforeEachBase() {
unreliablePool =
new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(16));
singleThreadPool = Executors.newSingleThreadExecutor();
}

@AfterEach
void afterEachBase() throws InterruptedException {
unreliablePool.shutdown();
singleThreadPool.shutdown();
assertTrue(unreliablePool.awaitTermination(30, SECONDS));
assertTrue(singleThreadPool.awaitTermination(30, SECONDS));
}

@Test
final void sequencing() throws Exception {
int countPerTopic = 50;

AtomicInteger lastSequenceTopic1 = new AtomicInteger();
AtomicInteger lastSequenceTopic2 = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(countPerTopic * 2);

TransactionManager transactionManager = txManager();
OrderedEntryListener orderedEntryListener =
new OrderedEntryListener(latch, new CountDownLatch(1));
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.submitter(Submitter.withExecutor(unreliablePool))
.attemptFrequency(Duration.ofMillis(500))
.instantiator(
Instantiator.using(
clazz ->
(InterfaceProcessor)
(foo, bar) -> {
if (random.nextInt(10) == 5) {
throw new RuntimeException(
"Temporary failure of InterfaceProcessor");
}
var lastSequence =
("topic1".equals(bar)) ? lastSequenceTopic1 : lastSequenceTopic2;
if ((lastSequence.get() == foo - 1)) {
lastSequence.set(foo);
latch.countDown();
} else {
latch.countDown();
throw new IllegalStateException(
String.format(
"Ordering violated (previous=%s, seq=%s",
lastSequence.get(), foo));
}
}))
.instantiator(new RandomFailingInstantiator())
.persistor(persistor())
.listener(orderedEntryListener)
.initializeImmediately(false)
.build();

Expand All @@ -104,8 +98,19 @@ final void sequencing() throws Exception {
});
}
assertTrue(latch.await(30, SECONDS));
assertEquals(countPerTopic, lastSequenceTopic1.get());
assertEquals(countPerTopic, lastSequenceTopic2.get());
var indexes = LongStream.range(1, countPerTopic + 1).boxed().collect(Collectors.toList());
assertEquals(
indexes,
orderedEntryListener.getSuccesses().stream()
.filter(it -> "topic1".equals(it.getTopic()))
.map(TransactionOutboxEntry::getSequence)
.collect(Collectors.toList()));
assertEquals(
indexes,
orderedEntryListener.getSuccesses().stream()
.filter(it -> "topic2".equals(it.getTopic()))
.map(TransactionOutboxEntry::getSequence)
.collect(Collectors.toList()));
});
}

Expand Down Expand Up @@ -435,7 +440,7 @@ final void retryBehaviour() throws Exception {
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.submitter(Submitter.withExecutor(unreliablePool))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(new LatchListener(latch))
.build();
Expand All @@ -448,7 +453,8 @@ final void retryBehaviour() throws Exception {
transactionManager.inTransaction(
() -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(latch.await(15, SECONDS));
});
},
singleThreadPool);
}

@Test
Expand Down Expand Up @@ -506,7 +512,7 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.submitter(Submitter.withExecutor(unreliablePool))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(orderedEntryListener)
.blockAfterAttempts(2)
Expand All @@ -519,30 +525,30 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
() -> {
transactionManager.inTransaction(
() -> outbox.schedule(InterfaceProcessor.class).process(3, "Whee"));
assertTrue(blockLatch.await(20, SECONDS));
assertTrue(blockLatch.await(20, SECONDS), "Entry was not blocked");
assertTrue(
(Boolean)
transactionManager.inTransactionReturns(
tx -> outbox.unblock(orderedEntryListener.getBlocked().getId())));
assertTrue(successLatch.await(20, SECONDS));
var orderedEntryEvents = orderedEntryListener.getOrderedEntries();
log.info("The entry life cycle is: {}", orderedEntryEvents);
assertTrue(successLatch.await(20, SECONDS), "Timeout waiting for success");
var events = orderedEntryListener.getEvents();
log.info("The entry life cycle is: {}", events);

// then we are only dealing in terms of a single outbox entry.
assertEquals(
1, orderedEntryEvents.stream().map(TransactionOutboxEntry::getId).distinct().count());
assertEquals(1, events.stream().map(TransactionOutboxEntry::getId).distinct().count());
// the first, scheduled entry has no lastAttemptTime set
assertNull(orderedEntryEvents.get(0).getLastAttemptTime());
assertNull(events.get(0).getLastAttemptTime());
// all subsequent entries (2 x failures (second of which 'blocks'), 1x success updates
// against db) have a distinct lastAttemptTime set on them.
assertEquals(
3,
orderedEntryEvents.stream()
events.stream()
.skip(1)
.map(TransactionOutboxEntry::getLastAttemptTime)
.distinct()
.count());
});
},
singleThreadPool);
}

/**
Expand All @@ -561,6 +567,7 @@ final void blockAndThenUnblockForRetry() throws Exception {
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.instantiator(new FailingInstantiator(attempts))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(latchListener)
.blockAfterAttempts(2)
Expand All @@ -579,7 +586,8 @@ final void blockAndThenUnblockForRetry() throws Exception {
transactionManager.inTransactionReturns(
tx -> outbox.unblock(latchListener.getBlocked().getId())));
assertTrue(successLatch.await(5, SECONDS));
});
},
singleThreadPool);
}

/** Hammers high-volume, frequently failing tasks to ensure that they all get run. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,14 +74,19 @@ protected void clearOutbox() {

protected void withRunningFlusher(TransactionOutbox outbox, ThrowingRunnable runnable)
throws Exception {
withRunningFlusher(outbox, runnable, flushExecutor);
}

protected void withRunningFlusher(
TransactionOutbox outbox, ThrowingRunnable runnable, Executor executor) throws Exception {
Thread backgroundThread =
new Thread(
() -> {
while (!Thread.interrupted()) {
try {
// Keep flushing work until there's nothing left to flush
//noinspection StatementWithEmptyBody
while (outbox.flush(flushExecutor)) {}
while (outbox.flush(executor)) {}
} catch (Exception e) {
log.error("Error flushing transaction outbox", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,46 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* Collects an ordered list of all entry events (*excluding blocked events) that have hit this
* listener until a specified number of blocks / successes have occurred.
*/
@Slf4j
public final class OrderedEntryListener implements TransactionOutboxListener {
private final CountDownLatch successLatch;
private final CountDownLatch blockedLatch;

@Getter private volatile TransactionOutboxEntry blocked;

private final CopyOnWriteArrayList<TransactionOutboxEntry> orderedEntries;
private final CopyOnWriteArrayList<TransactionOutboxEntry> events = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<TransactionOutboxEntry> successes =
new CopyOnWriteArrayList<>();

OrderedEntryListener(CountDownLatch successLatch, CountDownLatch blockedLatch) {
this.successLatch = successLatch;
this.blockedLatch = blockedLatch;
orderedEntries = new CopyOnWriteArrayList<>();
}

@Override
public void scheduled(TransactionOutboxEntry entry) {
orderedEntries.add(from(entry));
events.add(from(entry));
}

@Override
public void success(TransactionOutboxEntry entry) {
orderedEntries.add(from(entry));
var copy = from(entry);
events.add(copy);
successes.add(copy);
log.info(
"Received success #{}. Counting down at {}", successes.size(), successLatch.getCount());
successLatch.countDown();
}

@Override
public void failure(TransactionOutboxEntry entry, Throwable cause) {
orderedEntries.add(from(entry));
events.add(from(entry));
}

@Override
Expand All @@ -50,28 +57,21 @@ public void blocked(TransactionOutboxEntry entry, Throwable cause) {
}

/**
* Retrieve an unmodifiable copy of {@link #orderedEntries}. Beware, expectation is that this does
* not/ should not get accessed until the correct number of {@link
* #success(TransactionOutboxEntry)} and {@link #blocked(TransactionOutboxEntry, Throwable)}}
* counts have occurred.
* Retrieve an unmodifiable copy of {@link #events}. Beware, expectation is that this does not/
* should not get accessed until the correct number of {@link #success(TransactionOutboxEntry)}
* and {@link #blocked(TransactionOutboxEntry, Throwable)}} counts have occurred.
*
* @return unmodifiable list of ordered outbox entry events.
*/
public List<TransactionOutboxEntry> getOrderedEntries() {
return List.copyOf(orderedEntries);
public List<TransactionOutboxEntry> getEvents() {
return List.copyOf(events);
}

public List<TransactionOutboxEntry> getSuccesses() {
return List.copyOf(successes);
}

private TransactionOutboxEntry from(TransactionOutboxEntry entry) {
return TransactionOutboxEntry.builder()
.id(entry.getId())
.uniqueRequestId(entry.getUniqueRequestId())
.invocation(entry.getInvocation())
.lastAttemptTime(entry.getLastAttemptTime())
.nextAttemptTime(entry.getNextAttemptTime())
.attempts(entry.getAttempts())
.blocked(entry.isBlocked())
.processed(entry.isProcessed())
.version(entry.getVersion())
.build();
return entry.toBuilder().build();
}
}

0 comments on commit 51d523f

Please sign in to comment.