Package com.lmax.disruptor
The Disruptor is a concurrent programming framework for exchanging and coordinating work as a continuous series of events. It can be used as an alternative to wiring processing stages together via queues. The Disruptor design has the characteristics of generating significantly less garbage than queues and separates the concurrency concerns so non-locking algorithms can be employed resulting in greater scalability and performance.
It works on the principle of having a number of stages that are each single threaded with local state and memory. No global memory exists and all communication is achieved by passing messages/state via managed ring buffers.
Almost any graph or pipeline structure can be composed via one or more Disruptor patterns.
UniCast a series of items between 1 publisher and 1 EventProcessor.
track to prevent wrap
+------------------+
| |
| v
+----+ +-----+ +----+ +====+ +====+ +-----+
| P1 |--->| EP1 | | P1 |--->| RB |invalid input: '<'---| SB | | EP1 |
+----+ +-----+ +----+ +====+ +====+ +-----+
claim get ^ |
| |
+--------+
waitFor
Sequence a series of messages from multiple publishers
track to prevent wrap
+--------------------+
| |
| v
+----+ +----+ +====+ +====+ +-----+
| P1 |-------+ | P1 |--->| RB |invalid input: '<'---| SB | | EP1 |
+----+ | +----+ +====+ +====+ +-----+
v ^ get ^ |
+----+ +-----+ +----+ | | |
| P2 |--->| EP1 | | P2 |------+ +---------+
+----+ +-----+ +----+ | waitFor
^ |
+----+ | +----+ |
| P3 |-------+ | P3 |------+
+----+ +----+
Pipeline a series of messages
+----+ +-----+ +-----+ +-----+
| P1 |--->| EP1 |--->| EP2 |--->| EP3 |
+----+ +-----+ +-----+ +-----+
track to prevent wrap
+----------------------------------------------------------------+
| |
| v
+----+ +====+ +=====+ +-----+ +=====+ +-----+ +=====+ +-----+
| P1 |--->| RB | | SB1 |invalid input: '<'---| EP1 |invalid input: '<'---| SB2 |invalid input: '<'---| EP2 |invalid input: '<'---| SB3 |invalid input: '<'---| EP3 |
+----+ +====+ +=====+ +-----+ +=====+ +-----+ +=====+ +-----+
claim ^ get | waitFor | waitFor | waitFor
| | | |
+---------+---------------------+---------------------+
Multicast a series of messages to multiple EventProcessors
+-----+ track to prevent wrap
+----->| EP1 | +--------------------+----------+----------+
| +-----+ | | | |
| | v v v
+----+ +-----+ +----+ +====+ +====+ +-----+ +-----+ +-----+
| P1 |--->| EP2 | | P1 |--->| RB |invalid input: '<'---| SB | | EP1 | | EP2 | | EP3 |
+----+ +-----+ +----+ +====+ +====+ +-----+ +-----+ +-----+
| claim get ^ | | |
| +-----+ | | | |
+----->| EP3 | +---------+----------+----------+
+-----+ waitFor
Replicate a message then fold back the results
+-----+ track to prevent wrap
+----->| EP1 |-----+ +-------------------------------+
| +-----+ | | |
| v | v
+----+ +-----+ +----+ +====+ +=====+ +-----+
| P1 | | EP3 | | P1 |--->| RB |invalid input: '<'--------------| SB2 |invalid input: '<'---| EP3 |
+----+ +-----+ +----+ +====+ +=====+ +-----+
| ^ claim ^ get | waitFor
| +-----+ | | |
+----->| EP2 |-----+ +=====+ +-----+ |
+-----+ | SB1 |invalid input: '<'---| EP1 |invalid input: '<'-----+
+=====+ +-----+ |
^ |
| +-----+ |
+-------| EP2 |invalid input: '<'-----+
waitFor +-----+
Code Example
// Event holder for data to be exchanged
public final class ValueEvent
{
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}
// Callback handler which can be implemented by EventProcessors
final EventHandler<ValueEvent> eventHandler = new EventHandler<ValueEvent>()
{
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch)
throws Exception
{
// process a new event as it becomes available.
}
};
RingBuffer<ValueEvent> ringBuffer =
new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
SequenceBarrier<ValueEvent> sequenceBarrier = ringBuffer.newBarrier();
BatchEventProcessor<ValueEvent> batchProcessor = new BatchEventProcessor<ValueEvent>(sequenceBarrier, eventHandler);
ringBuffer.setGatingSequences(batchProcessor.getSequence());
// Each processor runs on a separate thread
EXECUTOR.submit(batchProcessor);
// Publishers claim events in sequence
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(1234);
// publish the event so it is available to EventProcessors
ringBuffer.publish(sequence);
-
ClassDescriptionBase class for the various sequencer types (single/multi).An aggregate collection of
EventHandlers that get called in sequence for each event.Used to alertEventProcessors waiting at aSequenceBarrierof status changes.Convenience class for handling the batching semantics of consuming entries from aRingBufferand delegating the available events to anEventHandler.Blocking strategy that uses a lock and condition variable forEventProcessors waiting on a barrier.Busy Spin strategy that uses a busy spin loop forEventProcessors waiting on a barrier.Implementors of this interface must provide a single long value that represents their current cursor value.DataProvider<T>EventFactory<T>Called by theRingBufferto pre-populate all the events to fill the RingBuffer.EventHandler<T>Callback interface to be implemented for processing events as they become available in theRingBufferEventPoller<T>Experimental poll-based interface for the Disruptor.An EventProcessor needs to be an implementation of a runnable that will poll for events from theRingBufferusing the appropriate wait strategy.EventSink<E>Implementations translate (write) data representations into events claimed from theRingBuffer.Implementations translate another data representations into events claimed from theRingBufferEventTranslatorThreeArg<T,A, B, C> Implementations translate another data representations into events claimed from theRingBufferEventTranslatorTwoArg<T,A, B> Implementations translate another data representations into events claimed from theRingBufferImplementations translate another data representations into events claimed from theRingBufferCallback handler for uncaught exceptions in the event processing cycle of theBatchEventProcessorProvides static methods for accessing a defaultExceptionHandlerobject.Convenience implementation of an exception handler that using standard JDK logging to log the exception asLevel.SEVERE and re-throw it wrapped in aRuntimeExceptionHides a group of Sequences behind a single SequenceConvenience implementation of an exception handler that using standard JDK logging to log the exception asLevel.INFOException thrown when it is not possible to insert a value into the ring buffer without it wrapping the consuming sequences.Implement this interface in yourEventHandlerto be notified when a thread for theBatchEventProcessorstarts and shuts down.Variation of theBlockingWaitStrategythat attempts to elide conditional wake-ups when the lock is uncontended.Variation of theTimeoutBlockingWaitStrategythat attempts to elide conditional wake-ups when the lock is uncontended.Coordinator for claiming sequences for access to a data structure while tracking dependentSequences.No operation version of aEventProcessorthat simply tracks aSequence.Sequence that follows (by wrapping) another sequencePhased wait strategy for waitingEventProcessors on a barrier.SequenceBarrierhanded out for gatingEventProcessors on a cursor sequence and optional dependentEventProcessor(s), using the given WaitStrategy.RingBuffer<E>Ring based store of reusable entries containing the data representing an event being exchanged between event producer andEventProcessors.Concurrent sequence class used for tracking the progress of the ring buffer and event processors.Coordination barrier for tracking the cursor for publishers and sequence of dependentEventProcessors for processing a data structureProvides static methods for managing aSequenceGroupobject.Coordinates claiming sequences for access to a data structure while tracking dependentSequencesUsed by theBatchEventProcessorto set a callback allowing theEventHandlerto notify when it has finished consuming an event if this happens after theEventHandler.onEvent(Object, long, boolean)call.Coordinator for claiming sequences for access to a data structure while tracking dependentSequences.Sleeping strategy that initially spins, then uses a Thread.yield(), and eventually sleep (LockSupport.parkNanos(n)) for the minimum number of nanos the OS and JVM will allow while theEventProcessors are waiting on a barrier.Strategy employed for makingEventProcessors wait on a cursorSequence.WorkerPool<T>WorkerPool contains a pool ofWorkProcessors that will consume sequences so jobs can be farmed out across a pool of workers.WorkHandler<T>Callback interface to be implemented for processing units of work as they become available in theRingBuffer.AWorkProcessorwraps a singleWorkHandler, effectively consuming the sequence and ensuring appropriate barriers.Yielding strategy that uses a Thread.yield() forEventProcessors waiting on a barrier after an initially spinning.