Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewindable event handler separation #440

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.lmax.disruptor.examples;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BatchEventProcessorBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
Expand Down Expand Up @@ -52,11 +53,11 @@ public static void main(final String[] args) throws InterruptedException
// Construct 2 batch event processors.
DynamicHandler handler1 = new DynamicHandler();
BatchEventProcessor<StubEvent> processor1 =
new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1);
new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(), handler1);

DynamicHandler handler2 = new DynamicHandler();
BatchEventProcessor<StubEvent> processor2 =
new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2);
new BatchEventProcessorBuilder().build(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2);

// Dynamically add both sequences to the ring buffer
ringBuffer.addGatingSequences(processor1.getSequence(), processor2.getSequence());
Expand Down
111 changes: 48 additions & 63 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,66 +28,43 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
public final class BatchEventProcessor<T>
implements EventProcessor
implements EventProcessor
{
private static final int IDLE = 0;
private static final int HALTED = IDLE + 1;
private static final int RUNNING = HALTED + 1;
private static final int DEFAULT_MAX_BATCH_SIZE = Integer.MAX_VALUE;

private final AtomicInteger running = new AtomicInteger(IDLE);
private ExceptionHandler<? super T> exceptionHandler;
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final EventHandlerBase<? super T> eventHandler;
private final int batchLimitOffset;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy();
private final RewindHandler rewindHandler;
private int retriesAttempted = 0;

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param maxBatchSize limits number of events processed in a batch before updating the sequence.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler,
final int maxBatchSize
BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandlerBase<? super T> eventHandler,
final int maxBatchSize,
final BatchRewindStrategy batchRewindStrategy
)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;

if (maxBatchSize < 1)
{
throw new IllegalArgumentException("maxBatchSize must be greater than 0");
}
this.batchLimitOffset = maxBatchSize - 1;

eventHandler.setSequenceCallback(sequence);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler
)
{
this(dataProvider, sequenceBarrier, eventHandler, DEFAULT_MAX_BATCH_SIZE);
this.rewindHandler = eventHandler instanceof RewindableEventHandler
? new TryRewindHandler(batchRewindStrategy)
: new NoRewindHandler();
}

@Override
Expand Down Expand Up @@ -124,23 +101,6 @@ public void setExceptionHandler(final ExceptionHandler<? super T> exceptionHandl
this.exceptionHandler = exceptionHandler;
}

/**
* Set a new {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}
* Which can include whether the batch should be rewound and reattempted,
* or simply thrown and move on to the next sequence
* the default is a {@link SimpleBatchRewindStrategy} which always rewinds
* @param batchRewindStrategy to replace the existing rewindStrategy.
*/
public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException();
}

this.batchRewindStrategy = batchRewindStrategy;
}

/**
* It is ok to have another thread rerun this method after a halt().
*
Expand Down Expand Up @@ -214,15 +174,7 @@ private void processEvents()
}
catch (final RewindableException e)
{
if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
{
nextSequence = startOfBatchSequence;
}
else
{
retriesAttempted = 0;
throw e;
}
nextSequence = rewindHandler.attemptRewindGetNextSequence(e, startOfBatchSequence);
}
}
catch (final TimeoutException e)
Expand Down Expand Up @@ -325,4 +277,37 @@ private ExceptionHandler<? super T> getExceptionHandler()
ExceptionHandler<? super T> handler = exceptionHandler;
return handler == null ? ExceptionHandlers.defaultHandler() : handler;
}

private class TryRewindHandler implements RewindHandler
{
private final BatchRewindStrategy batchRewindStrategy;

TryRewindHandler(final BatchRewindStrategy batchRewindStrategy)
{
this.batchRewindStrategy = batchRewindStrategy;
}

@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException
{
if (batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
{
return startOfBatchSequence;
}
else
{
retriesAttempted = 0;
throw e;
}
}
}

private static class NoRewindHandler implements RewindHandler
{
@Override
public long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence)
{
throw new UnsupportedOperationException("Rewindable Exception thrown from a non-rewindable event handler", e);
}
}
}
87 changes: 87 additions & 0 deletions src/main/java/com/lmax/disruptor/BatchEventProcessorBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2023 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lmax.disruptor;

public final class BatchEventProcessorBuilder
{
private int maxBatchSize = Integer.MAX_VALUE;

/**
* Set the maximum number of events that will be processed in a batch before updating the sequence.
*
* @param maxBatchSize max number of events to process in one batch.
* @return The builder
*/
public BatchEventProcessorBuilder setMaxBatchSize(final int maxBatchSize)
{
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* <p>The created {@link BatchEventProcessor} will not support batch rewind,
* but {@link EventHandler#setSequenceCallback(Sequence)} will be supported.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler)
{
final BatchEventProcessor<T> processor = new BatchEventProcessor<>(
dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null
);
eventHandler.setSequenceCallback(processor.getSequence());

return processor;
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
* @param batchRewindStrategy a {@link BatchRewindStrategy} for customizing how to handle a {@link RewindableException}.
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @return the BatchEventProcessor
*/
public <T> BatchEventProcessor<T> build(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler,
final BatchRewindStrategy batchRewindStrategy)
{
if (null == batchRewindStrategy)
{
throw new NullPointerException("batchRewindStrategy cannot be null when building a BatchEventProcessor");
}

return new BatchEventProcessor<>(
dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, batchRewindStrategy
);
}
}
42 changes: 3 additions & 39 deletions src/main/java/com/lmax/disruptor/EventHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
*/
public interface EventHandler<T>
public interface EventHandler<T> extends EventHandlerBase<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
Expand All @@ -36,35 +36,9 @@ public interface EventHandler<T>
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

/**
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
* @param queueDepth the total number of queued up events including the batch about to be processed
*/
default void onBatchStart(long batchSize, long queueDepth)
{
}

/**
* Called once on thread start before first event is available.
*/
default void onStart()
{
}

/**
* Called once just before the event processing thread is shutdown.
*
* <p>Sequence event processing will already have stopped before this method is called. No events will
* be processed after this message.
*/
default void onShutdown()
{
}

/**
* Used by the {@link BatchEventProcessor} to set a callback allowing the {@link EventHandler} to notify
* when it has finished consuming an event if this happens after the {@link EventHandler#onEvent(Object, long, boolean)} call.
Expand All @@ -78,14 +52,4 @@ default void onShutdown()
default void setSequenceCallback(Sequence sequenceCallback)
{
}

/**
* Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}.
*
* @param sequence - the last processed sequence.
* @throws Exception if the implementation is unable to handle this timeout.
*/
default void onTimeout(long sequence) throws Exception
{
}
}
Loading