From dfc82d227f52fdd70123a095d04c5cb260cac465 Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Wed, 26 Oct 2022 15:10:53 +0100 Subject: [PATCH 1/6] Split `EventHandler` interface into two for rewindable and non-rewindable Using `EventHandler::setSequenceCallback` with the newly added concept of Rewinding in the `BatchEventProcessor` is not going to go well as an `EventHandler` can change the sequence before a rewind. Non-rewindable `EventHandler` implementations are not able to throw a `RewindableException`. --- .../com/lmax/disruptor/BaseEventHandler.java | 71 ++++++++++++ .../lmax/disruptor/BatchEventProcessor.java | 101 ++++++++++++++---- .../java/com/lmax/disruptor/EventHandler.java | 41 +------ .../disruptor/RewindableEventHandler.java | 43 ++++++++ .../lmax/disruptor/RewindableException.java | 2 +- .../com/lmax/disruptor/dsl/Disruptor.java | 4 +- .../disruptor/dsl/EventProcessorInfo.java | 2 +- .../disruptor/primitive/LongRingBuffer.java | 61 ----------- .../RewindBatchEventProcessorTest.java | 4 +- 9 files changed, 205 insertions(+), 124 deletions(-) create mode 100644 src/main/java/com/lmax/disruptor/BaseEventHandler.java create mode 100644 src/main/java/com/lmax/disruptor/RewindableEventHandler.java delete mode 100644 src/perftest/java/com/lmax/disruptor/primitive/LongRingBuffer.java diff --git a/src/main/java/com/lmax/disruptor/BaseEventHandler.java b/src/main/java/com/lmax/disruptor/BaseEventHandler.java new file mode 100644 index 000000000..64eb54fe1 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/BaseEventHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 2022 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +interface BaseEventHandler +{ + /** + * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will + * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be + * processed without having to wait for any new event to arrive. This can be useful for event handlers that need + * to do slower operations like I/O as they can group together the data from multiple events into a single + * operation. Implementations should ensure that the operation is always performed when endOfBatch is true as + * the time between that message and the next one is indeterminate. + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} + * @throws Throwable if the EventHandler would like the exception handled further up the chain or possible rewind + * the batch if a {@link RewindableException} is thrown. + */ + void onEvent(T event, long sequence, boolean endOfBatch) throws Throwable; + + /** + * Invoked by {@link BatchEventProcessor} prior to processing a batch of events + * + * @param batchSize the size of the batch that is starting + */ + default void onBatchStart(long batchSize) + { + } + + /** + * Called once on thread start before first event is available. + */ + default void onStart() + { + } + + /** + * Called once just before the event processing thread is shutdown. + * + *

Sequence event processing will already have stopped before this method is called. No events will + * be processed after this message. + */ + default void onShutdown() + { + } + + /** + * Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}. + * + * @param sequence - the last processed sequence. + * @throws Exception if the implementation is unable to handle this timeout. + */ + default void onTimeout(long sequence) throws Exception + { + } +} diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 6bfb56924..60eac1ada 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2011 LMAX Ltd. + * Copyright 2022 LMAX Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. */ public final class BatchEventProcessor - implements EventProcessor + implements EventProcessor { private static final int IDLE = 0; private static final int HALTED = IDLE + 1; @@ -38,40 +38,78 @@ public final class BatchEventProcessor private ExceptionHandler exceptionHandler; private final DataProvider dataProvider; private final SequenceBarrier sequenceBarrier; - private final EventHandler eventHandler; + private final BaseEventHandler eventHandler; private final int batchLimitOffset; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy(); private int retriesAttempted = 0; + private final boolean rewindable; + + private BatchEventProcessor( + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final BaseEventHandler eventHandler, + final int maxBatchSize, + final boolean rewindable + ) + { + this.dataProvider = dataProvider; + this.sequenceBarrier = sequenceBarrier; + this.eventHandler = eventHandler; + + if (maxBatchSize < 1) + { + throw new IllegalArgumentException("maxBatchSize must be greater than 0"); + } + this.batchLimitOffset = maxBatchSize - 1; + + this.rewindable = rewindable; + } /** * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. * + *

This constructor will not support rewinding batches. + * * @param dataProvider to which events are published. * @param sequenceBarrier on which it is waiting. * @param eventHandler is the delegate to which events are dispatched. * @param maxBatchSize limits number of events processed in a batch before updating the sequence. */ public BatchEventProcessor( - final DataProvider dataProvider, - final SequenceBarrier sequenceBarrier, - final EventHandler eventHandler, - final int maxBatchSize + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final EventHandler eventHandler, + final int maxBatchSize ) { - this.dataProvider = dataProvider; - this.sequenceBarrier = sequenceBarrier; - this.eventHandler = eventHandler; - if (maxBatchSize < 1) - { - throw new IllegalArgumentException("maxBatchSize must be greater than 0"); - } - this.batchLimitOffset = maxBatchSize - 1; + this(dataProvider, sequenceBarrier, eventHandler, maxBatchSize, false); eventHandler.setSequenceCallback(sequence); } + /** + * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when + * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. + * + *

This constructor will support rewinding batches. + * + * @param dataProvider to which events are published. + * @param sequenceBarrier on which it is waiting. + * @param rewindableEventHandler is the delegate to which events are dispatched. + * @param maxBatchSize limits number of events processed in a batch before updating the sequence. + */ + public BatchEventProcessor( + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final RewindableEventHandler rewindableEventHandler, + final int maxBatchSize + ) + { + this(dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, true); + } + /** * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. @@ -89,6 +127,23 @@ public BatchEventProcessor( this(dataProvider, sequenceBarrier, eventHandler, Integer.MAX_VALUE); } + /** + * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when + * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. + * + * @param dataProvider to which events are published. + * @param sequenceBarrier on which it is waiting. + * @param rewindableEventHandler is the delegate to which events are dispatched. + */ + public BatchEventProcessor( + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final RewindableEventHandler rewindableEventHandler + ) + { + this(dataProvider, sequenceBarrier, rewindableEventHandler, Integer.MAX_VALUE); + } + @Override public Sequence getSequence() { @@ -128,6 +183,7 @@ public void setExceptionHandler(final ExceptionHandler exceptionHandl * Which can include whether the batch should be rewound and reattempted, * or simply thrown and move on to the next sequence * the default is a {@link SimpleBatchRewindStrategy} which always rewinds + * * @param batchRewindStrategy to replace the existing rewindStrategy. */ public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy) @@ -213,14 +269,21 @@ private void processEvents() } catch (final RewindableException e) { - if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND) + if (this.rewindable) { - nextSequence = startOfBatchSequence; + if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND) + { + nextSequence = startOfBatchSequence; + } + else + { + retriesAttempted = 0; + throw e; + } } else { - retriesAttempted = 0; - throw e; + throw new RuntimeException("Rewindable Exception thrown from a non-rewindable event handler", e); } } } diff --git a/src/main/java/com/lmax/disruptor/EventHandler.java b/src/main/java/com/lmax/disruptor/EventHandler.java index f0d5b21f5..b30192c11 100644 --- a/src/main/java/com/lmax/disruptor/EventHandler.java +++ b/src/main/java/com/lmax/disruptor/EventHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2011 LMAX Ltd. + * Copyright 2022 LMAX Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler. */ -public interface EventHandler +public interface EventHandler extends BaseEventHandler { /** * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will @@ -36,34 +36,9 @@ public interface EventHandler * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} * @throws Exception if the EventHandler would like the exception handled further up the chain. */ + @Override void onEvent(T event, long sequence, boolean endOfBatch) throws Exception; - /** - * Invoked by {@link BatchEventProcessor} prior to processing a batch of events - * - * @param batchSize the size of the batch that is starting - */ - default void onBatchStart(long batchSize) - { - } - - /** - * Called once on thread start before first event is available. - */ - default void onStart() - { - } - - /** - * Called once just before the event processing thread is shutdown. - * - *

Sequence event processing will already have stopped before this method is called. No events will - * be processed after this message. - */ - default void onShutdown() - { - } - /** * Used by the {@link BatchEventProcessor} to set a callback allowing the {@link EventHandler} to notify * when it has finished consuming an event if this happens after the {@link EventHandler#onEvent(Object, long, boolean)} call. @@ -77,14 +52,4 @@ default void onShutdown() default void setSequenceCallback(Sequence sequenceCallback) { } - - /** - * Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}. - * - * @param sequence - the last processed sequence. - * @throws Exception if the implementation is unable to handle this timeout. - */ - default void onTimeout(long sequence) throws Exception - { - } } diff --git a/src/main/java/com/lmax/disruptor/RewindableEventHandler.java b/src/main/java/com/lmax/disruptor/RewindableEventHandler.java new file mode 100644 index 000000000..b485dc256 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/RewindableEventHandler.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lmax.disruptor; + +/** + * Callback interface to be implemented for processing events as they become available in the {@link RingBuffer} + * with support for throwing a {@link RewindableException} when an even cannot be processed currently but may succeed on retry. + * + * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. + * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler. + */ +public interface RewindableEventHandler extends BaseEventHandler +{ + /** + * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will + * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be + * processed without having to wait for any new event to arrive. This can be useful for event handlers that need + * to do slower operations like I/O as they can group together the data from multiple events into a single + * operation. Implementations should ensure that the operation is always performed when endOfBatch is true as + * the time between that message and the next one is indeterminate. + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer} + * @throws RewindableException if the EventHandler would like the batch event processor to process the entire batch again. + * @throws Exception if the EventHandler would like the exception handled further up the chain. + */ + @Override + void onEvent(T event, long sequence, boolean endOfBatch) throws RewindableException, Exception; +} diff --git a/src/main/java/com/lmax/disruptor/RewindableException.java b/src/main/java/com/lmax/disruptor/RewindableException.java index 2b5550cb3..358ef4af0 100644 --- a/src/main/java/com/lmax/disruptor/RewindableException.java +++ b/src/main/java/com/lmax/disruptor/RewindableException.java @@ -5,7 +5,7 @@ * On throwing this exception the {@link BatchEventProcessor} can choose to rewind and replay the batch or throw * depending on the {@link BatchRewindStrategy} */ -public class RewindableException extends RuntimeException +public class RewindableException extends Throwable { /** * @param cause The underlying cause of the exception. diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java index 3822c911b..3b2f804a1 100644 --- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java +++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java @@ -230,7 +230,7 @@ public ExceptionHandlerSetting handleExceptionsFor(final EventHandler even * *

dw.after(A).handleEventsWith(B);
* - * @param handlers the event handlers, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventHandler[])}, + * @param handlers the event handlers, previously set up with {@link #handleEventsWith(EventHandler[])}, * that will form the barrier for subsequent handlers or processors. * @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers. */ @@ -253,7 +253,7 @@ public final EventHandlerGroup after(final EventHandler... handlers) * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)}, * that will form the barrier for subsequent handlers or processors. * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors. - * @see #after(com.lmax.disruptor.EventHandler[]) + * @see #after(EventHandler[]) */ public EventHandlerGroup after(final EventProcessor... processors) { diff --git a/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java b/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java index bd34d6b8e..0e967551f 100644 --- a/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java +++ b/src/main/java/com/lmax/disruptor/dsl/EventProcessorInfo.java @@ -37,7 +37,7 @@ class EventProcessorInfo implements ConsumerInfo private boolean endOfChain = true; EventProcessorInfo( - final EventProcessor eventprocessor, final EventHandler handler, final SequenceBarrier barrier) + final EventProcessor eventprocessor, final EventHandler handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; diff --git a/src/perftest/java/com/lmax/disruptor/primitive/LongRingBuffer.java b/src/perftest/java/com/lmax/disruptor/primitive/LongRingBuffer.java deleted file mode 100644 index 600a9d2b8..000000000 --- a/src/perftest/java/com/lmax/disruptor/primitive/LongRingBuffer.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.lmax.disruptor.primitive; - -import com.lmax.disruptor.BatchEventProcessor; -import com.lmax.disruptor.DataProvider; -import com.lmax.disruptor.Sequencer; - -public class LongRingBuffer -{ - private final Sequencer sequencer; - private final long[] buffer; - private final int mask; - - public LongRingBuffer(final Sequencer sequencer) - { - this.sequencer = sequencer; - this.buffer = new long[sequencer.getBufferSize()]; - this.mask = sequencer.getBufferSize() - 1; - } - - private int index(final long sequence) - { - return (int) sequence & mask; - } - - public void put(final long e) - { - final long next = sequencer.next(); - buffer[index(next)] = e; - sequencer.publish(next); - } - - public interface LongHandler - { - void onEvent(long value, long sequence, boolean endOfBatch); - } - - private class LongEvent implements DataProvider - { - private long sequence; - - public long get() - { - return buffer[index(sequence)]; - } - - @Override - public LongEvent get(final long sequence) - { - this.sequence = sequence; - return this; - } - } - - public BatchEventProcessor createProcessor(final LongHandler handler) - { - return new BatchEventProcessor<>( - new LongEvent(), - sequencer.newBarrier(), - (event, sequence, endOfBatch) -> handler.onEvent(event.get(), sequence, endOfBatch)); - } -} diff --git a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java index 934f993e0..3365bff98 100644 --- a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java @@ -425,7 +425,7 @@ private BatchEventProcessor create(final TestEventHandler eventHandle eventHandler); } - private final class TestEventHandler implements EventHandler + private final class TestEventHandler implements RewindableEventHandler { private final List values; private BatchEventProcessor processor; @@ -452,7 +452,7 @@ public void setRewindable(final BatchEventProcessor processor) } @Override - public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws Exception + public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws RewindableException, Exception { if (sequence == nonRewindableErrorSequence) From 922d02f3e6d09032532bdd1c1a1ea07f12789706 Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Mon, 31 Oct 2022 12:29:40 +0000 Subject: [PATCH 2/6] Javadoc correctness --- src/main/java/com/lmax/disruptor/BatchEventProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 60eac1ada..fb66a0399 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -70,7 +70,7 @@ private BatchEventProcessor( * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. * - *

This constructor will not support rewinding batches. + *

The created {@link BatchEventProcessor} will not support batch rewind. * * @param dataProvider to which events are published. * @param sequenceBarrier on which it is waiting. @@ -93,7 +93,7 @@ public BatchEventProcessor( * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. * - *

This constructor will support rewinding batches. + *

The created {@link BatchEventProcessor} will support batch rewind. * * @param dataProvider to which events are published. * @param sequenceBarrier on which it is waiting. From 256e4f8c9f4aaff44c118933a60bf3a639b704fc Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Mon, 31 Oct 2022 12:30:35 +0000 Subject: [PATCH 3/6] BaseEventHandler -> EventHandlerBase --- src/main/java/com/lmax/disruptor/BatchEventProcessor.java | 4 ++-- src/main/java/com/lmax/disruptor/EventHandler.java | 2 +- .../{BaseEventHandler.java => EventHandlerBase.java} | 2 +- src/main/java/com/lmax/disruptor/RewindableEventHandler.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) rename src/main/java/com/lmax/disruptor/{BaseEventHandler.java => EventHandlerBase.java} (98%) diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index fb66a0399..46dffcace 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -38,7 +38,7 @@ public final class BatchEventProcessor private ExceptionHandler exceptionHandler; private final DataProvider dataProvider; private final SequenceBarrier sequenceBarrier; - private final BaseEventHandler eventHandler; + private final EventHandlerBase eventHandler; private final int batchLimitOffset; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy(); @@ -48,7 +48,7 @@ public final class BatchEventProcessor private BatchEventProcessor( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, - final BaseEventHandler eventHandler, + final EventHandlerBase eventHandler, final int maxBatchSize, final boolean rewindable ) diff --git a/src/main/java/com/lmax/disruptor/EventHandler.java b/src/main/java/com/lmax/disruptor/EventHandler.java index b30192c11..c23306df7 100644 --- a/src/main/java/com/lmax/disruptor/EventHandler.java +++ b/src/main/java/com/lmax/disruptor/EventHandler.java @@ -21,7 +21,7 @@ * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler. */ -public interface EventHandler extends BaseEventHandler +public interface EventHandler extends EventHandlerBase { /** * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will diff --git a/src/main/java/com/lmax/disruptor/BaseEventHandler.java b/src/main/java/com/lmax/disruptor/EventHandlerBase.java similarity index 98% rename from src/main/java/com/lmax/disruptor/BaseEventHandler.java rename to src/main/java/com/lmax/disruptor/EventHandlerBase.java index 64eb54fe1..2ed0db9f6 100644 --- a/src/main/java/com/lmax/disruptor/BaseEventHandler.java +++ b/src/main/java/com/lmax/disruptor/EventHandlerBase.java @@ -15,7 +15,7 @@ */ package com.lmax.disruptor; -interface BaseEventHandler +interface EventHandlerBase { /** * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will diff --git a/src/main/java/com/lmax/disruptor/RewindableEventHandler.java b/src/main/java/com/lmax/disruptor/RewindableEventHandler.java index b485dc256..058828b30 100644 --- a/src/main/java/com/lmax/disruptor/RewindableEventHandler.java +++ b/src/main/java/com/lmax/disruptor/RewindableEventHandler.java @@ -22,7 +22,7 @@ * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler. */ -public interface RewindableEventHandler extends BaseEventHandler +public interface RewindableEventHandler extends EventHandlerBase { /** * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will From 60134897b3fab19324eeec30d20a58e27d39d469 Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Sat, 18 Mar 2023 16:57:32 +0000 Subject: [PATCH 4/6] Try behaviour over booleans --- .../examples/DynamicallyAddHandler.java | 5 +- .../lmax/disruptor/BatchEventProcessor.java | 136 +++++------------- .../disruptor/BatchEventProcessorBuilder.java | 80 +++++++++++ .../com/lmax/disruptor/RewindHandler.java | 22 +++ .../com/lmax/disruptor/dsl/Disruptor.java | 5 +- .../disruptor/immutable/CustomRingBuffer.java | 7 +- .../immutable/SimplePerformanceTest.java | 3 +- .../OneToOneOffHeapThroughputTest.java | 3 +- .../offheap/OneToOneOnHeapThroughputTest.java | 3 +- .../OneToOneSequencedBatchThroughputTest.java | 3 +- ...ToOneSequencedLongArrayThroughputTest.java | 3 +- .../OneToOneSequencedThroughputTest.java | 3 +- ...ToThreeDiamondSequencedThroughputTest.java | 7 +- ...oThreePipelineSequencedThroughputTest.java | 7 +- .../OneToThreeSequencedThroughputTest.java | 7 +- .../PingPongSequencedLatencyTest.java | 5 +- ...hreeToOneSequencedBatchThroughputTest.java | 3 +- .../ThreeToOneSequencedThroughputTest.java | 3 +- .../disruptor/BatchEventProcessorTest.java | 29 ++-- .../lmax/disruptor/LifecycleAwareTest.java | 3 +- .../MaxBatchSizeEventProcessorTest.java | 5 +- .../RewindBatchEventProcessorTest.java | 6 +- .../SequenceReportingCallbackTest.java | 4 +- .../com/lmax/disruptor/dsl/DisruptorTest.java | 110 +++++++------- 24 files changed, 262 insertions(+), 200 deletions(-) create mode 100644 src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java create mode 100644 src/main/java/com/lmax/disruptor/RewindHandler.java diff --git a/src/examples/java/com/lmax/disruptor/examples/DynamicallyAddHandler.java b/src/examples/java/com/lmax/disruptor/examples/DynamicallyAddHandler.java index fd24d95e1..8932a7152 100644 --- a/src/examples/java/com/lmax/disruptor/examples/DynamicallyAddHandler.java +++ b/src/examples/java/com/lmax/disruptor/examples/DynamicallyAddHandler.java @@ -1,6 +1,7 @@ package com.lmax.disruptor.examples; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; @@ -52,11 +53,11 @@ public static void main(final String[] args) throws InterruptedException // Construct 2 batch event processors. DynamicHandler handler1 = new DynamicHandler(); BatchEventProcessor processor1 = - new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1); + new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), handler1); DynamicHandler handler2 = new DynamicHandler(); BatchEventProcessor processor2 = - new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2); + new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2); // Dynamically add both sequences to the ring buffer ringBuffer.addGatingSequences(processor1.getSequence(), processor2.getSequence()); diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 46dffcace..0e64cff8e 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -41,16 +41,15 @@ public final class BatchEventProcessor private final EventHandlerBase eventHandler; private final int batchLimitOffset; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + private final RewindHandler rewindHandler; private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy(); private int retriesAttempted = 0; - private final boolean rewindable; - private BatchEventProcessor( + BatchEventProcessor( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, final EventHandlerBase eventHandler, - final int maxBatchSize, - final boolean rewindable + final int maxBatchSize ) { this.dataProvider = dataProvider; @@ -63,85 +62,9 @@ private BatchEventProcessor( } this.batchLimitOffset = maxBatchSize - 1; - this.rewindable = rewindable; - } - - /** - * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when - * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. - * - *

The created {@link BatchEventProcessor} will not support batch rewind. - * - * @param dataProvider to which events are published. - * @param sequenceBarrier on which it is waiting. - * @param eventHandler is the delegate to which events are dispatched. - * @param maxBatchSize limits number of events processed in a batch before updating the sequence. - */ - public BatchEventProcessor( - final DataProvider dataProvider, - final SequenceBarrier sequenceBarrier, - final EventHandler eventHandler, - final int maxBatchSize - ) - { - this(dataProvider, sequenceBarrier, eventHandler, maxBatchSize, false); - - eventHandler.setSequenceCallback(sequence); - } - - /** - * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when - * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. - * - *

The created {@link BatchEventProcessor} will support batch rewind. - * - * @param dataProvider to which events are published. - * @param sequenceBarrier on which it is waiting. - * @param rewindableEventHandler is the delegate to which events are dispatched. - * @param maxBatchSize limits number of events processed in a batch before updating the sequence. - */ - public BatchEventProcessor( - final DataProvider dataProvider, - final SequenceBarrier sequenceBarrier, - final RewindableEventHandler rewindableEventHandler, - final int maxBatchSize - ) - { - this(dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, true); - } - - /** - * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when - * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. - * - * @param dataProvider to which events are published. - * @param sequenceBarrier on which it is waiting. - * @param eventHandler is the delegate to which events are dispatched. - */ - public BatchEventProcessor( - final DataProvider dataProvider, - final SequenceBarrier sequenceBarrier, - final EventHandler eventHandler - ) - { - this(dataProvider, sequenceBarrier, eventHandler, Integer.MAX_VALUE); - } - - /** - * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when - * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. - * - * @param dataProvider to which events are published. - * @param sequenceBarrier on which it is waiting. - * @param rewindableEventHandler is the delegate to which events are dispatched. - */ - public BatchEventProcessor( - final DataProvider dataProvider, - final SequenceBarrier sequenceBarrier, - final RewindableEventHandler rewindableEventHandler - ) - { - this(dataProvider, sequenceBarrier, rewindableEventHandler, Integer.MAX_VALUE); + this.rewindHandler = eventHandler instanceof RewindableEventHandler + ? new TryRewindHandler() + : new NoRewindHandler(); } @Override @@ -251,9 +174,9 @@ private void processEvents() final long availableSequence = sequenceBarrier.waitFor(nextSequence); final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence); - if (endOfBatchSequence >= nextSequence) + if (nextSequence <= endOfBatchSequence) { - eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1); + eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1); } while (nextSequence <= endOfBatchSequence) @@ -269,22 +192,7 @@ private void processEvents() } catch (final RewindableException e) { - if (this.rewindable) - { - if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND) - { - nextSequence = startOfBatchSequence; - } - else - { - retriesAttempted = 0; - throw e; - } - } - else - { - throw new RuntimeException("Rewindable Exception thrown from a non-rewindable event handler", e); - } + nextSequence = rewindHandler.attemptRewindGetNextSequence(e, startOfBatchSequence); } } catch (final TimeoutException e) @@ -387,4 +295,30 @@ private ExceptionHandler getExceptionHandler() ExceptionHandler handler = exceptionHandler; return handler == null ? ExceptionHandlers.defaultHandler() : handler; } + + private class TryRewindHandler implements RewindHandler + { + @Override + public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException + { + if (batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND) + { + return startOfBatchSequence; + } + else + { + retriesAttempted = 0; + throw e; + } + } + } + + private static class NoRewindHandler implements RewindHandler + { + @Override + public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) + { + throw new UnsupportedOperationException("Rewindable Exception thrown from a non-rewindable event handler", e); + } + } } \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java new file mode 100644 index 000000000..77e8ff9ae --- /dev/null +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java @@ -0,0 +1,80 @@ +/* + * Copyright 2023 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lmax.disruptor; + +public final class BatchEventProcessorBuilder +{ + private int maxBatchSize = Integer.MAX_VALUE; + + /** + * Set the maximum number of events that will be processed in a batch before updating the sequence. + * + * @param maxBatchSize max number of events to process in one batch. + * @return The builder + */ + public BatchEventProcessorBuilder setMaxBatchSize(final int maxBatchSize) + { + this.maxBatchSize = maxBatchSize; + return this; + } + + /** + * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when + * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. + * + *

The created {@link BatchEventProcessor} will not support batch rewind, + * but {@link EventHandler#setSequenceCallback(Sequence)} will be supported. + * + * @param dataProvider to which events are published. + * @param sequenceBarrier on which it is waiting. + * @param eventHandler is the delegate to which events are dispatched. + * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. + * @return the BatchEventProcessor + */ + public BatchEventProcessor build( + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final EventHandler eventHandler) + { + final BatchEventProcessor processor = new BatchEventProcessor<>( + dataProvider, sequenceBarrier, eventHandler, maxBatchSize + ); + eventHandler.setSequenceCallback(processor.getSequence()); + + return processor; + } + + /** + * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when + * the {@link EventHandler#onEvent(Object, long, boolean)} method returns. + * + * @param dataProvider to which events are published. + * @param sequenceBarrier on which it is waiting. + * @param rewindableEventHandler is the delegate to which events are dispatched. + * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. + * @return the BatchEventProcessor + */ + public BatchEventProcessor build( + final DataProvider dataProvider, + final SequenceBarrier sequenceBarrier, + final RewindableEventHandler rewindableEventHandler) + { + return new BatchEventProcessor<>( + dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize + ); + } +} diff --git a/src/main/java/com/lmax/disruptor/RewindHandler.java b/src/main/java/com/lmax/disruptor/RewindHandler.java new file mode 100644 index 000000000..f0b9a23ed --- /dev/null +++ b/src/main/java/com/lmax/disruptor/RewindHandler.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 LMAX Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.lmax.disruptor; + +public interface RewindHandler +{ + long attemptRewindGetNextSequence(RewindableException e, long startOfBatchSequence) throws RewindableException; +} diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java index 3b2f804a1..93ca6c8cd 100644 --- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java +++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java @@ -16,6 +16,7 @@ package com.lmax.disruptor.dsl; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventProcessor; @@ -501,8 +502,8 @@ EventHandlerGroup createEventProcessors( { final EventHandler eventHandler = eventHandlers[i]; - final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { diff --git a/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java b/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java index 6374caeb4..91ba5d57d 100644 --- a/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java +++ b/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java @@ -1,6 +1,7 @@ package com.lmax.disruptor.immutable; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.DataProvider; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.Sequencer; @@ -79,10 +80,8 @@ public EventAccessor get(final long sequence) public BatchEventProcessor> createHandler(final EventHandler handler) { BatchEventProcessor> processor = - new BatchEventProcessor<>( - this, - sequencer.newBarrier(), - new AccessorEventHandler<>(handler)); + new BatchEventProcessorBuilder() + .build(this, sequencer.newBarrier(), new AccessorEventHandler<>(handler)); sequencer.addGatingSequences(processor.getSequence()); return processor; diff --git a/src/perftest/java/com/lmax/disruptor/immutable/SimplePerformanceTest.java b/src/perftest/java/com/lmax/disruptor/immutable/SimplePerformanceTest.java index 8c4394d6d..3e2cbfba5 100644 --- a/src/perftest/java/com/lmax/disruptor/immutable/SimplePerformanceTest.java +++ b/src/perftest/java/com/lmax/disruptor/immutable/SimplePerformanceTest.java @@ -1,6 +1,7 @@ package com.lmax.disruptor.immutable; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; @@ -33,7 +34,7 @@ public void run() private void doRun() throws InterruptedException { BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>( + new BatchEventProcessorBuilder().build( ringBuffer, ringBuffer.newBarrier(), eventHolderHandler); diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java index ec11f4e70..7707dab35 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java @@ -2,6 +2,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.DataProvider; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.PerfTestContext; @@ -34,7 +35,7 @@ public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor new OffHeapRingBuffer(new SingleProducerSequencer(BUFFER_SIZE, waitStrategy), BLOCK_SIZE); private final ByteBufferHandler handler = new ByteBufferHandler(); private final BatchEventProcessor processor = - new BatchEventProcessor<>(buffer, buffer.newBarrier(), handler); + new BatchEventProcessorBuilder().build(buffer, buffer.newBarrier(), handler); { buffer.addGatingSequences(processor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java index f25885f00..a976cadcd 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java @@ -2,6 +2,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.PerfTestContext; @@ -34,7 +35,7 @@ public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor BUFFER_SIZE, waitStrategy); private final ByteBufferHandler handler = new ByteBufferHandler(); private final BatchEventProcessor processor = - new BatchEventProcessor<>(buffer, buffer.newBarrier(), handler); + new BatchEventProcessorBuilder().build(buffer, buffer.newBarrier(), handler); { buffer.addGatingSequences(processor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java index 453f39f7f..603749edc 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedBatchThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; @@ -77,7 +78,7 @@ public final class OneToOneSequencedBatchThroughputTest extends AbstractPerfTest private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); { ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedLongArrayThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedLongArrayThroughputTest.java index b057f0dec..98d0c5de6 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedLongArrayThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedLongArrayThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; @@ -76,7 +77,7 @@ public final class OneToOneSequencedLongArrayThroughputTest extends AbstractPerf private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final LongArrayEventHandler handler = new LongArrayEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); { ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java index a85e839db..3cf9eb509 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; @@ -76,7 +77,7 @@ public final class OneToOneSequencedThroughputTest extends AbstractPerfTestDisru private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); { ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeDiamondSequencedThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeDiamondSequencedThroughputTest.java index b01e4fa56..65bad3820 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeDiamondSequencedThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeDiamondSequencedThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; @@ -113,18 +114,18 @@ public final class OneToThreeDiamondSequencedThroughputTest extends AbstractPerf private final FizzBuzzEventHandler fizzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ); private final BatchEventProcessor batchProcessorFizz = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, fizzHandler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, fizzHandler); private final FizzBuzzEventHandler buzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.BUZZ); private final BatchEventProcessor batchProcessorBuzz = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, buzzHandler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, buzzHandler); private final SequenceBarrier sequenceBarrierFizzBuzz = ringBuffer.newBarrier(batchProcessorFizz.getSequence(), batchProcessorBuzz.getSequence()); private final FizzBuzzEventHandler fizzBuzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ_BUZZ); private final BatchEventProcessor batchProcessorFizzBuzz = - new BatchEventProcessor<>(ringBuffer, sequenceBarrierFizzBuzz, fizzBuzzHandler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrierFizzBuzz, fizzBuzzHandler); { ringBuffer.addGatingSequences(batchProcessorFizzBuzz.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreePipelineSequencedThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreePipelineSequencedThroughputTest.java index a736a3f89..f0981057b 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreePipelineSequencedThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreePipelineSequencedThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; @@ -101,17 +102,17 @@ public final class OneToThreePipelineSequencedThroughputTest extends AbstractPer private final SequenceBarrier stepOneSequenceBarrier = ringBuffer.newBarrier(); private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE); private final BatchEventProcessor stepOneBatchProcessor = - new BatchEventProcessor<>(ringBuffer, stepOneSequenceBarrier, stepOneFunctionHandler); + new BatchEventProcessorBuilder().build(ringBuffer, stepOneSequenceBarrier, stepOneFunctionHandler); private final SequenceBarrier stepTwoSequenceBarrier = ringBuffer.newBarrier(stepOneBatchProcessor.getSequence()); private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO); private final BatchEventProcessor stepTwoBatchProcessor = - new BatchEventProcessor<>(ringBuffer, stepTwoSequenceBarrier, stepTwoFunctionHandler); + new BatchEventProcessorBuilder().build(ringBuffer, stepTwoSequenceBarrier, stepTwoFunctionHandler); private final SequenceBarrier stepThreeSequenceBarrier = ringBuffer.newBarrier(stepTwoBatchProcessor.getSequence()); private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE); private final BatchEventProcessor stepThreeBatchProcessor = - new BatchEventProcessor<>(ringBuffer, stepThreeSequenceBarrier, stepThreeFunctionHandler); + new BatchEventProcessorBuilder().build(ringBuffer, stepThreeSequenceBarrier, stepThreeFunctionHandler); { ringBuffer.addGatingSequences(stepThreeBatchProcessor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeSequencedThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeSequencedThroughputTest.java index 8d162d55e..18812a11f 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeSequencedThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/OneToThreeSequencedThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; @@ -109,9 +110,9 @@ public final class OneToThreeSequencedThroughputTest extends AbstractPerfTestDis private final BatchEventProcessor[] batchEventProcessors = new BatchEventProcessor[NUM_EVENT_PROCESSORS]; { - batchEventProcessors[0] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handlers[0]); - batchEventProcessors[1] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handlers[1]); - batchEventProcessors[2] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handlers[2]); + batchEventProcessors[0] = new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handlers[0]); + batchEventProcessors[1] = new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handlers[1]); + batchEventProcessors[2] = new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handlers[2]); ringBuffer.addGatingSequences( batchEventProcessors[0].getSequence(), diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java index 95051cd6c..bfdedf166 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/PingPongSequencedLatencyTest.java @@ -16,6 +16,7 @@ package com.lmax.disruptor.sequenced; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; @@ -85,12 +86,12 @@ public final class PingPongSequencedLatencyTest private final SequenceBarrier pongBarrier = pongBuffer.newBarrier(); private final Pinger pinger = new Pinger(pingBuffer, ITERATIONS, PAUSE_NANOS); private final BatchEventProcessor pingProcessor = - new BatchEventProcessor<>(pongBuffer, pongBarrier, pinger); + new BatchEventProcessorBuilder().build(pongBuffer, pongBarrier, pinger); private final SequenceBarrier pingBarrier = pingBuffer.newBarrier(); private final Ponger ponger = new Ponger(pongBuffer); private final BatchEventProcessor pongProcessor = - new BatchEventProcessor<>(pingBuffer, pingBarrier, ponger); + new BatchEventProcessorBuilder().build(pingBuffer, pingBarrier, ponger); { pingBuffer.addGatingSequences(pongProcessor.getSequence()); diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedBatchThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedBatchThroughputTest.java index d91e2fc11..6c7891abf 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedBatchThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedBatchThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; @@ -96,7 +97,7 @@ public final class ThreeToOneSequencedBatchThroughputTest extends AbstractPerfTe private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); private final ValueBatchPublisher[] valuePublishers = new ValueBatchPublisher[NUM_PUBLISHERS]; { diff --git a/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedThroughputTest.java b/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedThroughputTest.java index e4ee0f838..529c73333 100644 --- a/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/sequenced/ThreeToOneSequencedThroughputTest.java @@ -17,6 +17,7 @@ import com.lmax.disruptor.AbstractPerfTestDisruptor; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.PerfTestContext; import com.lmax.disruptor.RingBuffer; @@ -96,7 +97,7 @@ public final class ThreeToOneSequencedThroughputTest extends AbstractPerfTestDis private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); private final ValuePublisher[] valuePublishers = new ValuePublisher[NUM_PUBLISHERS]; { diff --git a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java index 8153533fd..b63d5a45c 100644 --- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java @@ -44,8 +44,8 @@ public void shouldThrowExceptionOnSettingNullExceptionHandler() { assertThrows(NullPointerException.class, () -> { - final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, sequenceBarrier, new ExceptionEventHandler()); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, new ExceptionEventHandler()); batchEventProcessor.setExceptionHandler(null); }); } @@ -56,8 +56,8 @@ public void shouldCallMethodsInLifecycleOrderForBatch() { CountDownLatch eventLatch = new CountDownLatch(3); LatchEventHandler eventHandler = new LatchEventHandler(eventLatch); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, sequenceBarrier, eventHandler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, eventHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); @@ -80,8 +80,8 @@ public void shouldCallExceptionHandlerOnUncaughtException() { CountDownLatch exceptionLatch = new CountDownLatch(1); LatchExceptionHandler latchExceptionHandler = new LatchExceptionHandler(exceptionLatch); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, sequenceBarrier, new ExceptionEventHandler()); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, new ExceptionEventHandler()); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); batchEventProcessor.setExceptionHandler(latchExceptionHandler); @@ -180,8 +180,8 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en } final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>( - ringBuffer, sequenceBarrier, new LoopbackEventHandler()); + new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, new LoopbackEventHandler()); ringBuffer.publish(ringBuffer.next()); ringBuffer.publish(ringBuffer.next()); @@ -207,7 +207,8 @@ public void shouldAlwaysHalt() throws InterruptedException DataProvider dp = sequence -> null; final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler(); - final BatchEventProcessor p1 = new BatchEventProcessor<>(dp, barrier, h1); + final BatchEventProcessor p1 = new BatchEventProcessorBuilder() + .build(dp, barrier, h1); Thread t1 = new Thread(p1); p1.halt(); @@ -219,7 +220,8 @@ public void shouldAlwaysHalt() throws InterruptedException for (int i = 0; i < 1000; i++) { final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler(); - final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2); + final BatchEventProcessor p2 = new BatchEventProcessorBuilder() + .build(dp, barrier, h2); Thread t2 = new Thread(p2); t2.start(); p2.halt(); @@ -231,7 +233,8 @@ public void shouldAlwaysHalt() throws InterruptedException for (int i = 0; i < 1000; i++) { final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler(); - final BatchEventProcessor p2 = new BatchEventProcessor<>(dp, barrier, h2); + final BatchEventProcessor p2 = new BatchEventProcessorBuilder() + .build(dp, barrier, h2); Thread t2 = new Thread(p2); t2.start(); Thread.yield(); @@ -282,8 +285,8 @@ public void shouldNotPassZeroSizeToBatchStartAware() throws Exception { BatchAwareEventHandler eventHandler = new BatchAwareEventHandler(); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, new DelegatingSequenceBarrier(this.sequenceBarrier), eventHandler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, new DelegatingSequenceBarrier(this.sequenceBarrier), eventHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java b/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java index 09544abe4..104944b4f 100644 --- a/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java +++ b/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java @@ -34,7 +34,8 @@ public final class LifecycleAwareTest private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final LifecycleAwareEventHandler handler = new LifecycleAwareEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, handler); @Test public void shouldNotifyOfBatchProcessorLifecycle() throws Exception diff --git a/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java b/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java index f436be377..a96d40ffa 100644 --- a/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/MaxBatchSizeEventProcessorTest.java @@ -45,8 +45,9 @@ void setUp() countDownLatch = new CountDownLatch(PUBLISH_COUNT); eventHandler = new BatchLimitRecordingHandler(countDownLatch); - batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, this.sequenceBarrier, eventHandler, MAX_BATCH_SIZE); + batchEventProcessor = new BatchEventProcessorBuilder() + .setMaxBatchSize(MAX_BATCH_SIZE) + .build(ringBuffer, this.sequenceBarrier, eventHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java index 3365bff98..12f31eff2 100644 --- a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java @@ -419,10 +419,8 @@ private EventRangeExpectation event(final long sequenceStart, final long sequenc private BatchEventProcessor create(final TestEventHandler eventHandler) { - return new BatchEventProcessor<>( - ringBuffer, - ringBuffer.newBarrier(), - eventHandler); + return new BatchEventProcessorBuilder() + .build(ringBuffer, ringBuffer.newBarrier(), eventHandler); } private final class TestEventHandler implements RewindableEventHandler diff --git a/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java b/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java index 3928c7921..c8f3d0fbb 100644 --- a/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java +++ b/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java @@ -35,8 +35,8 @@ public void shouldReportProgressByUpdatingSequenceViaCallback() final RingBuffer ringBuffer = createMultiProducer(StubEvent.EVENT_FACTORY, 16); final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); final EventHandler handler = new TestSequenceReportingEventHandler(); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>( - ringBuffer, sequenceBarrier, handler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, handler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); Thread thread = new Thread(batchEventProcessor); diff --git a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java index 212deef7d..d5a27fe54 100644 --- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java +++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java @@ -16,6 +16,7 @@ package com.lmax.disruptor.dsl; import com.lmax.disruptor.BatchEventProcessor; +import com.lmax.disruptor.BatchEventProcessorBuilder; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -123,7 +124,7 @@ public void shouldBatchOfEvents() throws Exception disruptor.start(); - disruptor.publishEvents((event, sequence, arg) -> lastPublishedEvent = event, new Object[] { "a", "b" }); + disruptor.publishEvents((event, sequence, arg) -> lastPublishedEvent = event, new Object[]{"a", "b"}); if (!eventCounter.await(5, TimeUnit.SECONDS)) { @@ -135,12 +136,12 @@ public void shouldBatchOfEvents() throws Exception public void shouldAddEventProcessorsAfterPublishing() throws Exception { RingBuffer rb = disruptor.getRingBuffer(); - BatchEventProcessor b1 = new BatchEventProcessor<>( - rb, rb.newBarrier(), new SleepingEventHandler()); - BatchEventProcessor b2 = new BatchEventProcessor<>( - rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler()); - BatchEventProcessor b3 = new BatchEventProcessor<>( - rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler()); + BatchEventProcessor b1 = new BatchEventProcessorBuilder() + .build(rb, rb.newBarrier(), new SleepingEventHandler()); + BatchEventProcessor b2 = new BatchEventProcessorBuilder() + .build(rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler()); + BatchEventProcessor b3 = new BatchEventProcessorBuilder() + .build(rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler()); assertThat(b1.getSequence().get(), is(-1L)); assertThat(b2.getSequence().get(), is(-1L)); @@ -184,14 +185,14 @@ public void shouldSetSequenceForHandlerIfAddedAfterPublish() throws Exception @Test public void shouldCreateEventProcessorGroupForFirstEventProcessors() - throws Exception + throws Exception { executor.ignoreExecutions(); final EventHandler eventHandler1 = new SleepingEventHandler(); EventHandler eventHandler2 = new SleepingEventHandler(); final EventHandlerGroup eventHandlerGroup = - disruptor.handleEventsWith(eventHandler1, eventHandler2); + disruptor.handleEventsWith(eventHandler1, eventHandler2); disruptor.start(); assertNotNull(eventHandlerGroup); @@ -211,7 +212,7 @@ public void shouldMakeEntriesAvailableToFirstHandlersImmediately() throws Except @Test public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvailableToDependentEventProcessors() - throws Exception + throws Exception { DelayedEventHandler eventHandler1 = createDelayedEventHandler(); @@ -225,13 +226,15 @@ public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvai @Test public void shouldSupportAddingCustomEventProcessorWithFactory() - throws Exception + throws Exception { RingBuffer rb = disruptor.getRingBuffer(); - BatchEventProcessor b1 = new BatchEventProcessor<>( - rb, rb.newBarrier(), new SleepingEventHandler()); - EventProcessorFactory b2 = (ringBuffer, barrierSequences) -> new BatchEventProcessor<>( - ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler()); + BatchEventProcessor b1 = new BatchEventProcessorBuilder() + .build( + rb, rb.newBarrier(), new SleepingEventHandler()); + EventProcessorFactory b2 = (ringBuffer, barrierSequences) -> new BatchEventProcessorBuilder() + .build( + ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler()); disruptor.handleEventsWith(b1).then(b2); @@ -242,7 +245,7 @@ public void shouldSupportAddingCustomEventProcessorWithFactory() @Test public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() - throws Exception + throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); @@ -260,7 +263,7 @@ public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() @Test public void shouldWaitOnAllProducersJoinedByAnd() - throws Exception + throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); @@ -279,7 +282,7 @@ public void shouldWaitOnAllProducersJoinedByAnd() @Test public void shouldThrowExceptionIfHandlerIsNotAlreadyConsuming() - throws Exception + throws Exception { assertThrows(IllegalArgumentException.class, () -> disruptor.after(createDelayedEventHandler()).handleEventsWith(createDelayedEventHandler())); @@ -287,7 +290,7 @@ public void shouldThrowExceptionIfHandlerIsNotAlreadyConsuming() @Test public void shouldTrackEventHandlersByIdentityNotEquality() - throws Exception + throws Exception { assertThrows(IllegalArgumentException.class, () -> { @@ -304,7 +307,7 @@ public void shouldTrackEventHandlersByIdentityNotEquality() @SuppressWarnings("deprecation") @Test public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -323,7 +326,7 @@ public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors() @SuppressWarnings("deprecation") @Test public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -342,7 +345,7 @@ public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnN @Test public void shouldSupportSpecifyingADefaultExceptionHandlerForEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -360,7 +363,7 @@ public void shouldSupportSpecifyingADefaultExceptionHandlerForEventProcessors() @Test public void shouldApplyDefaultExceptionHandlerToExistingEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -378,7 +381,7 @@ public void shouldApplyDefaultExceptionHandlerToExistingEventProcessors() @Test public void shouldBlockProducerUntilAllEventProcessorsHaveAdvanced() - throws Exception + throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); @@ -409,7 +412,7 @@ public void shouldBlockProducerUntilAllEventProcessorsHaveAdvanced() @Test public void shouldBeAbleToOverrideTheExceptionHandlerForAEventProcessor() - throws Exception + throws Exception { final RuntimeException testException = new RuntimeException(); final ExceptionThrowingEventHandler eventHandler = new ExceptionThrowingEventHandler(testException); @@ -426,7 +429,7 @@ public void shouldBeAbleToOverrideTheExceptionHandlerForAEventProcessor() @Test public void shouldThrowExceptionWhenAddingEventProcessorsAfterTheProducerBarrierHasBeenCreated() - throws Exception + throws Exception { assertThrows(IllegalStateException.class, () -> { @@ -439,7 +442,7 @@ public void shouldThrowExceptionWhenAddingEventProcessorsAfterTheProducerBarrier @Test public void shouldThrowExceptionIfStartIsCalledTwice() - throws Exception + throws Exception { assertThrows(IllegalStateException.class, () -> { @@ -452,7 +455,7 @@ public void shouldThrowExceptionIfStartIsCalledTwice() @Test public void shouldSupportCustomProcessorsAsDependencies() - throws Exception + throws Exception { RingBuffer ringBuffer = disruptor.getRingBuffer(); @@ -462,7 +465,8 @@ public void shouldSupportCustomProcessorsAsDependencies() EventHandler handlerWithBarrier = new EventHandlerStub<>(countDownLatch); final BatchEventProcessor processor = - new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); + new BatchEventProcessorBuilder() + .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); disruptor.handleEventsWith(processor).then(handlerWithBarrier); @@ -473,7 +477,7 @@ public void shouldSupportCustomProcessorsAsDependencies() @Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() - throws Exception + throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); @@ -485,7 +489,8 @@ public void shouldSupportHandlersAsDependenciesToCustomProcessors() final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor processor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handlerWithBarrier); + new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); @@ -507,7 +512,8 @@ public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exce final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor processor = - new BatchEventProcessor<>(ringBuffer, sequenceBarrier, delayedEventHandler2); + new BatchEventProcessorBuilder() + .build(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); @@ -525,11 +531,13 @@ public void shouldSupportMultipleCustomProcessorsAsDependencies() throws Excepti final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final BatchEventProcessor processor1 = - new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler1); + new BatchEventProcessorBuilder() + .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler1); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); final BatchEventProcessor processor2 = - new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler2); + new BatchEventProcessorBuilder() + .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler2); disruptor.handleEventsWith(processor1, processor2); disruptor.after(processor1, processor2).handleEventsWith(handlerWithBarrier); @@ -610,9 +618,10 @@ public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throw { assertEquals(0, barrierSequences.length, "Should not have had any barrier sequences"); - return new BatchEventProcessor<>( - disruptor.getRingBuffer(), ringBuffer.newBarrier( - barrierSequences), eventHandler); + return new BatchEventProcessorBuilder() + .build( + disruptor.getRingBuffer(), ringBuffer.newBarrier( + barrierSequences), eventHandler); }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); @@ -629,18 +638,19 @@ public void shouldHonourDependenciesForCustomProcessors() throws Exception (ringBuffer, barrierSequences) -> { assertSame(1, barrierSequences.length, "Should have had a barrier sequence"); - return new BatchEventProcessor<>( - disruptor.getRingBuffer(), ringBuffer.newBarrier( - barrierSequences), eventHandler); + return new BatchEventProcessorBuilder() + .build( + disruptor.getRingBuffer(), ringBuffer.newBarrier( + barrierSequences), eventHandler); }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); } private void ensureTwoEventsProcessedAccordingToDependencies( - final CountDownLatch countDownLatch, - final DelayedEventHandler... dependencies) - throws InterruptedException, BrokenBarrierException + final CountDownLatch countDownLatch, + final DelayedEventHandler... dependencies) + throws InterruptedException, BrokenBarrierException { publishEvent(); publishEvent(); @@ -656,13 +666,13 @@ private void ensureTwoEventsProcessedAccordingToDependencies( } private void assertProducerReaches( - final StubPublisher stubPublisher, - final int expectedPublicationCount, - final boolean strict) + final StubPublisher stubPublisher, + final int expectedPublicationCount, + final boolean strict) { long loopStart = System.currentTimeMillis(); while (stubPublisher.getPublicationCount() < expectedPublicationCount && System - .currentTimeMillis() - loopStart < 5000) + .currentTimeMillis() - loopStart < 5000) { Thread.yield(); } @@ -725,14 +735,14 @@ private DelayedEventHandler createDelayedEventHandler() } private void assertThatCountDownLatchEquals( - final CountDownLatch countDownLatch, - final long expectedCountDownValue) + final CountDownLatch countDownLatch, + final long expectedCountDownValue) { assertThat(Long.valueOf(countDownLatch.getCount()), equalTo(Long.valueOf(expectedCountDownValue))); } private void assertThatCountDownLatchIsZero(final CountDownLatch countDownLatch) - throws InterruptedException + throws InterruptedException { boolean released = countDownLatch.await(TIMEOUT_IN_SECONDS, SECONDS); assertTrue(released, "Batch handler did not receive entries: " + countDownLatch.getCount()); From d9fefddb24f28a8e8dfd5464029d6c01f4c1e9f2 Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Sat, 18 Mar 2023 17:15:48 +0000 Subject: [PATCH 5/6] Move mutable rewind strategy to builder where it can be scoped to only the rewind supporting build method --- .../lmax/disruptor/BatchEventProcessor.java | 31 +++------ .../disruptor/BatchEventProcessorBuilder.java | 13 +++- .../RewindBatchEventProcessorTest.java | 64 +++++++++++-------- 3 files changed, 57 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 0e64cff8e..a493f6732 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -42,14 +42,14 @@ public final class BatchEventProcessor private final int batchLimitOffset; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RewindHandler rewindHandler; - private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy(); private int retriesAttempted = 0; BatchEventProcessor( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, final EventHandlerBase eventHandler, - final int maxBatchSize + final int maxBatchSize, + final BatchRewindStrategy batchRewindStrategy ) { this.dataProvider = dataProvider; @@ -63,7 +63,7 @@ public final class BatchEventProcessor this.batchLimitOffset = maxBatchSize - 1; this.rewindHandler = eventHandler instanceof RewindableEventHandler - ? new TryRewindHandler() + ? new TryRewindHandler(batchRewindStrategy) : new NoRewindHandler(); } @@ -101,24 +101,6 @@ public void setExceptionHandler(final ExceptionHandler exceptionHandl this.exceptionHandler = exceptionHandler; } - /** - * Set a new {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException} - * Which can include whether the batch should be rewound and reattempted, - * or simply thrown and move on to the next sequence - * the default is a {@link SimpleBatchRewindStrategy} which always rewinds - * - * @param batchRewindStrategy to replace the existing rewindStrategy. - */ - public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy) - { - if (null == batchRewindStrategy) - { - throw new NullPointerException(); - } - - this.batchRewindStrategy = batchRewindStrategy; - } - /** * It is ok to have another thread rerun this method after a halt(). * @@ -298,6 +280,13 @@ private ExceptionHandler getExceptionHandler() private class TryRewindHandler implements RewindHandler { + private final BatchRewindStrategy batchRewindStrategy; + + TryRewindHandler(final BatchRewindStrategy batchRewindStrategy) + { + this.batchRewindStrategy = batchRewindStrategy; + } + @Override public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException { diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java index 77e8ff9ae..2b02c9918 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java @@ -51,7 +51,7 @@ public BatchEventProcessor build( final EventHandler eventHandler) { final BatchEventProcessor processor = new BatchEventProcessor<>( - dataProvider, sequenceBarrier, eventHandler, maxBatchSize + dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null ); eventHandler.setSequenceCallback(processor.getSequence()); @@ -65,16 +65,23 @@ public BatchEventProcessor build( * @param dataProvider to which events are published. * @param sequenceBarrier on which it is waiting. * @param rewindableEventHandler is the delegate to which events are dispatched. + * @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}. * @param event implementation storing the data for sharing during exchange or parallel coordination of an event. * @return the BatchEventProcessor */ public BatchEventProcessor build( final DataProvider dataProvider, final SequenceBarrier sequenceBarrier, - final RewindableEventHandler rewindableEventHandler) + final RewindableEventHandler rewindableEventHandler, + final BatchRewindStrategy batchRewindStrategy) { + if (null == batchRewindStrategy) + { + throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor"); + } + return new BatchEventProcessor<>( - dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize + dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy ); } } diff --git a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java index 12f31eff2..8ec2f5cbd 100644 --- a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java @@ -19,6 +19,7 @@ import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RewindBatchEventProcessorTest { @@ -38,7 +39,7 @@ public void shouldRewindOnFirstEventOfBatchSizeOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -59,7 +60,7 @@ public void shouldRewindOnFirstEventOfBatch() singletonList(rewind(0, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -80,7 +81,7 @@ public void shouldRewindOnEventInMiddleOfBatch() singletonList(rewind(8, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -104,7 +105,7 @@ public void shouldRewindOnLastEventOfBatch() -1 ); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -125,7 +126,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatch() singletonList(rewind(4, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -141,7 +142,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -161,7 +162,7 @@ public void shouldRewindMultipleTimes() singletonList(rewind(8, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -184,7 +185,7 @@ public void shouldRewindMultipleTimesOnLastEventInBatch() singletonList(rewind(lastSequenceNumber, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -207,7 +208,7 @@ public void shouldRewindMultipleTimesInSameBatch() asList(rewind(5, 3), rewind(7, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -228,7 +229,7 @@ public void shouldRewindMultipleTimesOnBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 3)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -246,7 +247,7 @@ public void shouldFallOverWhenNonRewindableExceptionIsThrown() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, 8); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); AtomicReference exceptionHandled = new AtomicReference<>(); @@ -263,7 +264,7 @@ public void shouldProcessUpToMaxBatchSizeForEachGivenBatch() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -285,7 +286,7 @@ public void shouldOnlyRewindBatch() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -308,11 +309,11 @@ void shouldInvokeRewindPauseStrategyOnRewind() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); + final BatchEventProcessor eventProcessor = create(eventHandler, rewindPauseStrategy); + eventHandler.setRewindable(eventProcessor); - CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); - eventProcessor.setRewindStrategy(rewindPauseStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -335,11 +336,11 @@ void shouldNotInvokeRewindPauseStrategyWhenNoRewindsOccur() singletonList(rewind(-1, -1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); + final BatchEventProcessor eventProcessor = create(eventHandler, rewindPauseStrategy); + eventHandler.setRewindable(eventProcessor); - CountingBatchRewindStrategy rewindPauseStrategy = new CountingBatchRewindStrategy(); - eventProcessor.setRewindStrategy(rewindPauseStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -359,10 +360,10 @@ void shouldCopeWithTheNanosecondRewindPauseStrategy() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + final BatchEventProcessor eventProcessor = create(eventHandler, new NanosecondPauseBatchRewindStrategy(1000)); + eventHandler.setRewindable(eventProcessor); - eventProcessor.setRewindStrategy(new NanosecondPauseBatchRewindStrategy(1000)); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -381,19 +382,18 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy() int lastSequenceNumber = ringBufferEntries - 1; fill(ringBuffer, ringBufferEntries); - EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3); - final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(15, 99), rewind(25, 99)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler); + EventuallyGiveUpBatchRewindStrategy batchRewindStrategy = new EventuallyGiveUpBatchRewindStrategy(3); + final BatchEventProcessor eventProcessor = create(eventHandler, batchRewindStrategy); + eventHandler.setRewindable(eventProcessor); AtomicReference exceptionHandled = new AtomicReference<>(); eventProcessor.setExceptionHandler(new StubExceptionHandler(exceptionHandled)); - eventProcessor.setRewindStrategy(batchRewindStrategy); eventProcessor.run(); assertThat(values, containsExactSequence( @@ -406,6 +406,16 @@ void shouldGiveUpWhenUsingTheGiveUpRewindStrategy() event(26, lastSequenceNumber))); // unable to process 25 so it ends up skipping it } + @Test + void shouldNotAllowNullBatchRewindStrategy() + { + final TestEventHandler eventHandler = new TestEventHandler(values, + asList(rewind(15, 99), rewind(25, 99)), + -1, + -1); + final BatchEventProcessorBuilder batchEventProcessorBuilder = new BatchEventProcessorBuilder(); + assertThrows(NullPointerException.class, () -> batchEventProcessorBuilder.build(ringBuffer, ringBuffer.newBarrier(), eventHandler, null)); + } private static ForceRewindSequence rewind(final long sequenceNumberToFailOn, final long timesToFail) { @@ -417,10 +427,10 @@ private EventRangeExpectation event(final long sequenceStart, final long sequenc return new EventRangeExpectation(sequenceStart, sequenceEnd, false); } - private BatchEventProcessor create(final TestEventHandler eventHandler) + private BatchEventProcessor create(final TestEventHandler eventHandler, final BatchRewindStrategy batchRewindStrategy) { return new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), eventHandler); + .build(ringBuffer, ringBuffer.newBarrier(), eventHandler, batchRewindStrategy); } private final class TestEventHandler implements RewindableEventHandler From 63d5af7b9e1773eb5def4bfe2f9cf7a74c952f3b Mon Sep 17 00:00:00 2001 From: Nick Palmer Date: Sat, 18 Mar 2023 18:38:43 +0000 Subject: [PATCH 6/6] Revert mistaken test reformatting An overly keen find & replace made more changes than necessary while changing from direct BatchEventProcessor construction to using the builder. --- .../com/lmax/disruptor/dsl/Disruptor.java | 4 +- .../disruptor/immutable/CustomRingBuffer.java | 6 +- .../disruptor/BatchEventProcessorTest.java | 29 +++-- .../lmax/disruptor/LifecycleAwareTest.java | 3 +- .../RewindBatchEventProcessorTest.java | 38 +++--- .../SequenceReportingCallbackTest.java | 4 +- .../com/lmax/disruptor/dsl/DisruptorTest.java | 109 ++++++++---------- 7 files changed, 95 insertions(+), 98 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java index 93ca6c8cd..be203c685 100644 --- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java +++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java @@ -502,8 +502,8 @@ EventHandlerGroup createEventProcessors( { final EventHandler eventHandler = eventHandlers[i]; - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, barrier, eventHandler); + final BatchEventProcessor batchEventProcessor = + new BatchEventProcessorBuilder().build(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { diff --git a/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java b/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java index 91ba5d57d..847bac025 100644 --- a/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java +++ b/src/perftest/java/com/lmax/disruptor/immutable/CustomRingBuffer.java @@ -80,8 +80,10 @@ public EventAccessor get(final long sequence) public BatchEventProcessor> createHandler(final EventHandler handler) { BatchEventProcessor> processor = - new BatchEventProcessorBuilder() - .build(this, sequencer.newBarrier(), new AccessorEventHandler<>(handler)); + new BatchEventProcessorBuilder().build( + this, + sequencer.newBarrier(), + new AccessorEventHandler<>(handler)); sequencer.addGatingSequences(processor.getSequence()); return processor; diff --git a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java index c6cd3ee9c..acd9b4078 100644 --- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java @@ -44,8 +44,8 @@ public void shouldThrowExceptionOnSettingNullExceptionHandler() { assertThrows(NullPointerException.class, () -> { - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, new ExceptionEventHandler()); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder().build( + ringBuffer, sequenceBarrier, new ExceptionEventHandler()); batchEventProcessor.setExceptionHandler(null); }); } @@ -56,8 +56,8 @@ public void shouldCallMethodsInLifecycleOrderForBatch() { CountDownLatch eventLatch = new CountDownLatch(3); LatchEventHandler eventHandler = new LatchEventHandler(eventLatch); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, eventHandler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder().build( + ringBuffer, sequenceBarrier, eventHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); @@ -80,8 +80,8 @@ public void shouldCallExceptionHandlerOnUncaughtException() { CountDownLatch exceptionLatch = new CountDownLatch(1); LatchExceptionHandler latchExceptionHandler = new LatchExceptionHandler(exceptionLatch); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, new ExceptionEventHandler()); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder().build( + ringBuffer, sequenceBarrier, new ExceptionEventHandler()); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); batchEventProcessor.setExceptionHandler(latchExceptionHandler); @@ -180,8 +180,8 @@ public void onEvent(final StubEvent event, final long sequence, final boolean en } final BatchEventProcessor batchEventProcessor = - new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, new LoopbackEventHandler()); + new BatchEventProcessorBuilder().build( + ringBuffer, sequenceBarrier, new LoopbackEventHandler()); ringBuffer.publish(ringBuffer.next()); ringBuffer.publish(ringBuffer.next()); @@ -207,8 +207,7 @@ public void shouldAlwaysHalt() throws InterruptedException DataProvider dp = sequence -> null; final LatchLifeCycleHandler h1 = new LatchLifeCycleHandler(); - final BatchEventProcessor p1 = new BatchEventProcessorBuilder() - .build(dp, barrier, h1); + final BatchEventProcessor p1 = new BatchEventProcessorBuilder().build(dp, barrier, h1); Thread t1 = new Thread(p1); p1.halt(); @@ -220,8 +219,7 @@ public void shouldAlwaysHalt() throws InterruptedException for (int i = 0; i < 1000; i++) { final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler(); - final BatchEventProcessor p2 = new BatchEventProcessorBuilder() - .build(dp, barrier, h2); + final BatchEventProcessor p2 = new BatchEventProcessorBuilder().build(dp, barrier, h2); Thread t2 = new Thread(p2); t2.start(); p2.halt(); @@ -233,8 +231,7 @@ public void shouldAlwaysHalt() throws InterruptedException for (int i = 0; i < 1000; i++) { final LatchLifeCycleHandler h2 = new LatchLifeCycleHandler(); - final BatchEventProcessor p2 = new BatchEventProcessorBuilder() - .build(dp, barrier, h2); + final BatchEventProcessor p2 = new BatchEventProcessorBuilder().build(dp, barrier, h2); Thread t2 = new Thread(p2); t2.start(); Thread.yield(); @@ -285,8 +282,8 @@ public void shouldNotPassZeroSizeToBatchStartAware() throws Exception { BatchAwareEventHandler eventHandler = new BatchAwareEventHandler(); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, new DelegatingSequenceBarrier(this.sequenceBarrier), eventHandler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder().build( + ringBuffer, new DelegatingSequenceBarrier(this.sequenceBarrier), eventHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); diff --git a/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java b/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java index 104944b4f..7c23e2da6 100644 --- a/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java +++ b/src/test/java/com/lmax/disruptor/LifecycleAwareTest.java @@ -34,8 +34,7 @@ public final class LifecycleAwareTest private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); private final LifecycleAwareEventHandler handler = new LifecycleAwareEventHandler(); private final BatchEventProcessor batchEventProcessor = - new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, handler); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handler); @Test public void shouldNotifyOfBatchProcessorLifecycle() throws Exception diff --git a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java index 8ec2f5cbd..0ed6bd4d4 100644 --- a/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/RewindBatchEventProcessorTest.java @@ -39,7 +39,7 @@ public void shouldRewindOnFirstEventOfBatchSizeOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, asList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -60,7 +60,7 @@ public void shouldRewindOnFirstEventOfBatch() singletonList(rewind(0, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -81,7 +81,7 @@ public void shouldRewindOnEventInMiddleOfBatch() singletonList(rewind(8, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -105,7 +105,7 @@ public void shouldRewindOnLastEventOfBatch() -1 ); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -126,7 +126,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatch() singletonList(rewind(4, 1)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -142,7 +142,7 @@ public void shouldRunBatchCompleteOnLastEventOfBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 1)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -162,7 +162,7 @@ public void shouldRewindMultipleTimes() singletonList(rewind(8, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -185,7 +185,7 @@ public void shouldRewindMultipleTimesOnLastEventInBatch() singletonList(rewind(lastSequenceNumber, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -208,7 +208,7 @@ public void shouldRewindMultipleTimesInSameBatch() asList(rewind(5, 3), rewind(7, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -229,7 +229,7 @@ public void shouldRewindMultipleTimesOnBatchOfOne() fill(ringBuffer, 1); final TestEventHandler eventHandler = new TestEventHandler(values, singletonList(rewind(0, 3)), 0, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -247,7 +247,7 @@ public void shouldFallOverWhenNonRewindableExceptionIsThrown() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, 8); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); AtomicReference exceptionHandled = new AtomicReference<>(); @@ -264,7 +264,7 @@ public void shouldProcessUpToMaxBatchSizeForEachGivenBatch() fill(ringBuffer, ringBufferEntries); final TestEventHandler eventHandler = new TestEventHandler(values, emptyList(), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -286,7 +286,7 @@ public void shouldOnlyRewindBatch() singletonList(rewind(15, 3)), lastSequenceNumber, -1); - final BatchEventProcessor eventProcessor = create(eventHandler, new SimpleBatchRewindStrategy()); + final BatchEventProcessor eventProcessor = create(eventHandler); eventHandler.setRewindable(eventProcessor); eventProcessor.run(); @@ -427,10 +427,18 @@ private EventRangeExpectation event(final long sequenceStart, final long sequenc return new EventRangeExpectation(sequenceStart, sequenceEnd, false); } + private BatchEventProcessor create(final TestEventHandler eventHandler) + { + return create(eventHandler, new SimpleBatchRewindStrategy()); + } + private BatchEventProcessor create(final TestEventHandler eventHandler, final BatchRewindStrategy batchRewindStrategy) { - return new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), eventHandler, batchRewindStrategy); + return new BatchEventProcessorBuilder().build( + ringBuffer, + ringBuffer.newBarrier(), + eventHandler, + batchRewindStrategy); } private final class TestEventHandler implements RewindableEventHandler diff --git a/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java b/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java index c8f3d0fbb..0bf1085fb 100644 --- a/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java +++ b/src/test/java/com/lmax/disruptor/SequenceReportingCallbackTest.java @@ -35,8 +35,8 @@ public void shouldReportProgressByUpdatingSequenceViaCallback() final RingBuffer ringBuffer = createMultiProducer(StubEvent.EVENT_FACTORY, 16); final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); final EventHandler handler = new TestSequenceReportingEventHandler(); - final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, handler); + final BatchEventProcessor batchEventProcessor = new BatchEventProcessorBuilder().build( + ringBuffer, sequenceBarrier, handler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); Thread thread = new Thread(batchEventProcessor); diff --git a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java index d5a27fe54..679187347 100644 --- a/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java +++ b/src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java @@ -124,7 +124,7 @@ public void shouldBatchOfEvents() throws Exception disruptor.start(); - disruptor.publishEvents((event, sequence, arg) -> lastPublishedEvent = event, new Object[]{"a", "b"}); + disruptor.publishEvents((event, sequence, arg) -> lastPublishedEvent = event, new Object[] { "a", "b" }); if (!eventCounter.await(5, TimeUnit.SECONDS)) { @@ -136,12 +136,12 @@ public void shouldBatchOfEvents() throws Exception public void shouldAddEventProcessorsAfterPublishing() throws Exception { RingBuffer rb = disruptor.getRingBuffer(); - BatchEventProcessor b1 = new BatchEventProcessorBuilder() - .build(rb, rb.newBarrier(), new SleepingEventHandler()); - BatchEventProcessor b2 = new BatchEventProcessorBuilder() - .build(rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler()); - BatchEventProcessor b3 = new BatchEventProcessorBuilder() - .build(rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler()); + BatchEventProcessor b1 = new BatchEventProcessorBuilder().build( + rb, rb.newBarrier(), new SleepingEventHandler()); + BatchEventProcessor b2 = new BatchEventProcessorBuilder().build( + rb, rb.newBarrier(b1.getSequence()), new SleepingEventHandler()); + BatchEventProcessor b3 = new BatchEventProcessorBuilder().build( + rb, rb.newBarrier(b2.getSequence()), new SleepingEventHandler()); assertThat(b1.getSequence().get(), is(-1L)); assertThat(b2.getSequence().get(), is(-1L)); @@ -185,14 +185,14 @@ public void shouldSetSequenceForHandlerIfAddedAfterPublish() throws Exception @Test public void shouldCreateEventProcessorGroupForFirstEventProcessors() - throws Exception + throws Exception { executor.ignoreExecutions(); final EventHandler eventHandler1 = new SleepingEventHandler(); EventHandler eventHandler2 = new SleepingEventHandler(); final EventHandlerGroup eventHandlerGroup = - disruptor.handleEventsWith(eventHandler1, eventHandler2); + disruptor.handleEventsWith(eventHandler1, eventHandler2); disruptor.start(); assertNotNull(eventHandlerGroup); @@ -212,7 +212,7 @@ public void shouldMakeEntriesAvailableToFirstHandlersImmediately() throws Except @Test public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvailableToDependentEventProcessors() - throws Exception + throws Exception { DelayedEventHandler eventHandler1 = createDelayedEventHandler(); @@ -226,15 +226,13 @@ public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvai @Test public void shouldSupportAddingCustomEventProcessorWithFactory() - throws Exception + throws Exception { RingBuffer rb = disruptor.getRingBuffer(); - BatchEventProcessor b1 = new BatchEventProcessorBuilder() - .build( - rb, rb.newBarrier(), new SleepingEventHandler()); - EventProcessorFactory b2 = (ringBuffer, barrierSequences) -> new BatchEventProcessorBuilder() - .build( - ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler()); + BatchEventProcessor b1 = new BatchEventProcessorBuilder().build( + rb, rb.newBarrier(), new SleepingEventHandler()); + EventProcessorFactory b2 = (ringBuffer, barrierSequences) -> new BatchEventProcessorBuilder().build( + ringBuffer, ringBuffer.newBarrier(barrierSequences), new SleepingEventHandler()); disruptor.handleEventsWith(b1).then(b2); @@ -245,7 +243,7 @@ public void shouldSupportAddingCustomEventProcessorWithFactory() @Test public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() - throws Exception + throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); @@ -263,7 +261,7 @@ public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() @Test public void shouldWaitOnAllProducersJoinedByAnd() - throws Exception + throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); @@ -282,7 +280,7 @@ public void shouldWaitOnAllProducersJoinedByAnd() @Test public void shouldThrowExceptionIfHandlerIsNotAlreadyConsuming() - throws Exception + throws Exception { assertThrows(IllegalArgumentException.class, () -> disruptor.after(createDelayedEventHandler()).handleEventsWith(createDelayedEventHandler())); @@ -290,7 +288,7 @@ public void shouldThrowExceptionIfHandlerIsNotAlreadyConsuming() @Test public void shouldTrackEventHandlersByIdentityNotEquality() - throws Exception + throws Exception { assertThrows(IllegalArgumentException.class, () -> { @@ -307,7 +305,7 @@ public void shouldTrackEventHandlersByIdentityNotEquality() @SuppressWarnings("deprecation") @Test public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -326,7 +324,7 @@ public void shouldSupportSpecifyingAExceptionHandlerForEventProcessors() @SuppressWarnings("deprecation") @Test public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -345,7 +343,7 @@ public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnN @Test public void shouldSupportSpecifyingADefaultExceptionHandlerForEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -363,7 +361,7 @@ public void shouldSupportSpecifyingADefaultExceptionHandlerForEventProcessors() @Test public void shouldApplyDefaultExceptionHandlerToExistingEventProcessors() - throws Exception + throws Exception { AtomicReference eventHandled = new AtomicReference<>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); @@ -381,7 +379,7 @@ public void shouldApplyDefaultExceptionHandlerToExistingEventProcessors() @Test public void shouldBlockProducerUntilAllEventProcessorsHaveAdvanced() - throws Exception + throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); @@ -412,7 +410,7 @@ public void shouldBlockProducerUntilAllEventProcessorsHaveAdvanced() @Test public void shouldBeAbleToOverrideTheExceptionHandlerForAEventProcessor() - throws Exception + throws Exception { final RuntimeException testException = new RuntimeException(); final ExceptionThrowingEventHandler eventHandler = new ExceptionThrowingEventHandler(testException); @@ -429,7 +427,7 @@ public void shouldBeAbleToOverrideTheExceptionHandlerForAEventProcessor() @Test public void shouldThrowExceptionWhenAddingEventProcessorsAfterTheProducerBarrierHasBeenCreated() - throws Exception + throws Exception { assertThrows(IllegalStateException.class, () -> { @@ -442,7 +440,7 @@ public void shouldThrowExceptionWhenAddingEventProcessorsAfterTheProducerBarrier @Test public void shouldThrowExceptionIfStartIsCalledTwice() - throws Exception + throws Exception { assertThrows(IllegalStateException.class, () -> { @@ -455,7 +453,7 @@ public void shouldThrowExceptionIfStartIsCalledTwice() @Test public void shouldSupportCustomProcessorsAsDependencies() - throws Exception + throws Exception { RingBuffer ringBuffer = disruptor.getRingBuffer(); @@ -465,8 +463,7 @@ public void shouldSupportCustomProcessorsAsDependencies() EventHandler handlerWithBarrier = new EventHandlerStub<>(countDownLatch); final BatchEventProcessor processor = - new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); + new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); disruptor.handleEventsWith(processor).then(handlerWithBarrier); @@ -477,7 +474,7 @@ public void shouldSupportCustomProcessorsAsDependencies() @Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() - throws Exception + throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); @@ -489,8 +486,7 @@ public void shouldSupportHandlersAsDependenciesToCustomProcessors() final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor processor = - new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, handlerWithBarrier); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); @@ -512,8 +508,7 @@ public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exce final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor processor = - new BatchEventProcessorBuilder() - .build(ringBuffer, sequenceBarrier, delayedEventHandler2); + new BatchEventProcessorBuilder().build(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); @@ -531,13 +526,11 @@ public void shouldSupportMultipleCustomProcessorsAsDependencies() throws Excepti final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final BatchEventProcessor processor1 = - new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler1); + new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler1); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); final BatchEventProcessor processor2 = - new BatchEventProcessorBuilder() - .build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler2); + new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler2); disruptor.handleEventsWith(processor1, processor2); disruptor.after(processor1, processor2).handleEventsWith(handlerWithBarrier); @@ -618,10 +611,9 @@ public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throw { assertEquals(0, barrierSequences.length, "Should not have had any barrier sequences"); - return new BatchEventProcessorBuilder() - .build( - disruptor.getRingBuffer(), ringBuffer.newBarrier( - barrierSequences), eventHandler); + return new BatchEventProcessorBuilder().build( + disruptor.getRingBuffer(), ringBuffer.newBarrier( + barrierSequences), eventHandler); }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); @@ -638,19 +630,18 @@ public void shouldHonourDependenciesForCustomProcessors() throws Exception (ringBuffer, barrierSequences) -> { assertSame(1, barrierSequences.length, "Should have had a barrier sequence"); - return new BatchEventProcessorBuilder() - .build( - disruptor.getRingBuffer(), ringBuffer.newBarrier( - barrierSequences), eventHandler); + return new BatchEventProcessorBuilder().build( + disruptor.getRingBuffer(), ringBuffer.newBarrier( + barrierSequences), eventHandler); }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); } private void ensureTwoEventsProcessedAccordingToDependencies( - final CountDownLatch countDownLatch, - final DelayedEventHandler... dependencies) - throws InterruptedException, BrokenBarrierException + final CountDownLatch countDownLatch, + final DelayedEventHandler... dependencies) + throws InterruptedException, BrokenBarrierException { publishEvent(); publishEvent(); @@ -666,13 +657,13 @@ private void ensureTwoEventsProcessedAccordingToDependencies( } private void assertProducerReaches( - final StubPublisher stubPublisher, - final int expectedPublicationCount, - final boolean strict) + final StubPublisher stubPublisher, + final int expectedPublicationCount, + final boolean strict) { long loopStart = System.currentTimeMillis(); while (stubPublisher.getPublicationCount() < expectedPublicationCount && System - .currentTimeMillis() - loopStart < 5000) + .currentTimeMillis() - loopStart < 5000) { Thread.yield(); } @@ -735,14 +726,14 @@ private DelayedEventHandler createDelayedEventHandler() } private void assertThatCountDownLatchEquals( - final CountDownLatch countDownLatch, - final long expectedCountDownValue) + final CountDownLatch countDownLatch, + final long expectedCountDownValue) { assertThat(Long.valueOf(countDownLatch.getCount()), equalTo(Long.valueOf(expectedCountDownValue))); } private void assertThatCountDownLatchIsZero(final CountDownLatch countDownLatch) - throws InterruptedException + throws InterruptedException { boolean released = countDownLatch.await(TIMEOUT_IN_SECONDS, SECONDS); assertTrue(released, "Batch handler did not receive entries: " + countDownLatch.getCount());