Class ManyToOneRingBuffer
- All Implemented Interfaces:
RingBuffer
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final AtomicBuffer
private final int
private final int
private final int
private final int
private final int
private final int
static final int
Minimal required capacity of the ring buffer excludingRingBufferDescriptor.TRAILER_LENGTH
.private final int
Fields inherited from interface org.agrona.concurrent.ringbuffer.RingBuffer
INSUFFICIENT_CAPACITY, PADDING_MSG_TYPE_ID
-
Constructor Summary
ConstructorsConstructorDescriptionManyToOneRingBuffer
(AtomicBuffer buffer) Construct a newRingBuffer
based on an underlyingAtomicBuffer
. -
Method Summary
Modifier and TypeMethodDescriptionvoid
abort
(int index) Abort claim and allow consumer to proceed after the claimed length.buffer()
Get the underlying buffer used by the RingBuffer for storage.int
capacity()
Get the capacity of the ring-buffer in bytes for exchange.private void
checkMsgLength
(int length) private int
claimCapacity
(AtomicBuffer buffer, int recordLength) void
commit
(int index) Commit message that was written in the previously claimed space thus making it available to the consumer.private int
computeRecordIndex
(int index) long
The time of the last consumer heartbeat.void
consumerHeartbeatTime
(long time) Set the time of the last consumer heartbeat.long
The position in bytes from start up for the consumers.int
controlledRead
(ControlledMessageHandler handler) Read as many messages as are available to the end of the ring buffer with the handler able to control progress.int
controlledRead
(ControlledMessageHandler handler, int messageCountLimit) Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.int
The maximum message length in bytes supported by the underlying ring buffer.long
Get the next value that can be used for a correlation id on a message when a response needs to be correlated.long
The position in bytes from start up of the producers.int
read
(MessageHandler handler) Read as many messages as are available to the end of the ring buffer.int
read
(MessageHandler handler, int messageCountLimit) Read as many messages as are available to end of the ring buffer to up a supplied maximum.private static boolean
scanBackToConfirmStillZeroed
(AtomicBuffer buffer, int from, int limit) int
size()
Size of the buffer backlog in bytes between producers and consumers.int
tryClaim
(int msgTypeId, int length) Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics.boolean
unblock()
Unblock a multi-producer ring buffer when a producer has died during the act of offering.private int
verifyClaimedSpaceNotReleased
(AtomicBuffer buffer, int recordIndex) boolean
write
(int msgTypeId, DirectBuffer srcBuffer, int offset, int length) Non-blocking write of a message to an underlying ring-buffer.
-
Field Details
-
MIN_CAPACITY
public static final int MIN_CAPACITYMinimal required capacity of the ring buffer excludingRingBufferDescriptor.TRAILER_LENGTH
.- See Also:
-
capacity
private final int capacity -
maxMsgLength
private final int maxMsgLength -
tailPositionIndex
private final int tailPositionIndex -
headCachePositionIndex
private final int headCachePositionIndex -
headPositionIndex
private final int headPositionIndex -
correlationIdCounterIndex
private final int correlationIdCounterIndex -
consumerHeartbeatIndex
private final int consumerHeartbeatIndex -
buffer
-
-
Constructor Details
-
ManyToOneRingBuffer
Construct a newRingBuffer
based on an underlyingAtomicBuffer
. The underlying buffer must a power of 2 in size plus sufficient space for theRingBufferDescriptor.TRAILER_LENGTH
.- Parameters:
buffer
- via which events will be exchanged.- Throws:
IllegalArgumentException
- if the buffer capacity is not a power of 2 plusRingBufferDescriptor.TRAILER_LENGTH
or if capacity is less thanMIN_CAPACITY
.
-
-
Method Details
-
capacity
public int capacity()Get the capacity of the ring-buffer in bytes for exchange.- Specified by:
capacity
in interfaceRingBuffer
- Returns:
- the capacity of the ring-buffer in bytes for exchange.
-
write
Non-blocking write of a message to an underlying ring-buffer.- Specified by:
write
in interfaceRingBuffer
- Parameters:
msgTypeId
- type of the message encoding.srcBuffer
- containing the encoded binary message.offset
- at which the encoded message begins.length
- of the encoded message in bytes.- Returns:
- true if written to the ring-buffer, or false if insufficient space exists.
-
tryClaim
public int tryClaim(int msgTypeId, int length) Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. Once the message has been written thenRingBuffer.commit(int)
should be called thus making it available to be consumed. Alternatively a claim can be aborted usingRingBuffer.abort(int)
method.Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until the claimed space is either committed or aborted. Producers will be able to write message even when outstanding claims exist.
An example of using
tryClaim
:final RingBuffer ringBuffer = ...; final int index = ringBuffer.tryClaim(msgTypeId, messageLength); if (index > 0) { try { final AtomicBuffer buffer = ringBuffer.buffer(); // Work with the buffer directly using the index ... } finally { ringBuffer.commit(index); // commit message } }
Ensure that claimed space is released even in case of an exception:
final RingBuffer ringBuffer = ...; final int index = ringBuffer.tryClaim(msgTypeId, messageLength); if (index > 0) { try { final AtomicBuffer buffer = ringBuffer.buffer(); // Work with the buffer directly using the index ... ringBuffer.commit(index); // commit message } catch (final Exception ex) { ringBuffer.abort(index); // allow consumer to proceed ... } }
- Specified by:
tryClaim
in interfaceRingBuffer
- Parameters:
msgTypeId
- type of the message encoding. Will be written into the header upon successful claim.length
- of the claim in bytes. A claim length cannot be greater thanRingBuffer.maxMsgLength()
.- Returns:
- a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns
RingBuffer.INSUFFICIENT_CAPACITY
indicating that there is not enough free space in the buffer. - See Also:
-
commit
public void commit(int index) Commit message that was written in the previously claimed space thus making it available to the consumer.- Specified by:
commit
in interfaceRingBuffer
- Parameters:
index
- at which the encoded message begins, i.e. value returned from theRingBuffer.tryClaim(int, int)
call.- See Also:
-
abort
public void abort(int index) Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, i.e. changes type of the message toRingBuffer.PADDING_MSG_TYPE_ID
.- Specified by:
abort
in interfaceRingBuffer
- Parameters:
index
- at which the encoded message begins, i.e. value returned from theRingBuffer.tryClaim(int, int)
call.- See Also:
-
read
Read as many messages as are available to the end of the ring buffer.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
RingBuffer.size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Specified by:
read
in interfaceRingBuffer
- Parameters:
handler
- to be called for processing each message in turn.- Returns:
- the number of messages that have been processed.
-
read
Read as many messages as are available to end of the ring buffer to up a supplied maximum.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
RingBuffer.size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Specified by:
read
in interfaceRingBuffer
- Parameters:
handler
- to be called for processing each message in turn.messageCountLimit
- the number of messages will be read in a single invocation.- Returns:
- the number of messages that have been processed.
-
controlledRead
Read as many messages as are available to the end of the ring buffer with the handler able to control progress.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
RingBuffer.size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Specified by:
controlledRead
in interfaceRingBuffer
- Parameters:
handler
- to be called for processing each message in turn which will return how to progress.- Returns:
- the number of messages that have been processed.
-
controlledRead
Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
RingBuffer.size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Specified by:
controlledRead
in interfaceRingBuffer
- Parameters:
handler
- to be called for processing each message in turn which will return how to progress.messageCountLimit
- the number of messages will be read in a single invocation.- Returns:
- the number of messages that have been processed.
-
maxMsgLength
public int maxMsgLength()The maximum message length in bytes supported by the underlying ring buffer.- Specified by:
maxMsgLength
in interfaceRingBuffer
- Returns:
- the maximum message length in bytes supported by the underlying ring buffer.
-
nextCorrelationId
public long nextCorrelationId()Get the next value that can be used for a correlation id on a message when a response needs to be correlated.This method should be thread safe.
- Specified by:
nextCorrelationId
in interfaceRingBuffer
- Returns:
- the next value in the correlation sequence.
-
buffer
Get the underlying buffer used by the RingBuffer for storage.- Specified by:
buffer
in interfaceRingBuffer
- Returns:
- the underlying buffer used by the RingBuffer for storage.
-
consumerHeartbeatTime
public void consumerHeartbeatTime(long time) Set the time of the last consumer heartbeat.Note: The value for time must be valid across processes which means
System.nanoTime()
is not a valid option.- Specified by:
consumerHeartbeatTime
in interfaceRingBuffer
- Parameters:
time
- of the last consumer heartbeat.
-
consumerHeartbeatTime
public long consumerHeartbeatTime()The time of the last consumer heartbeat.- Specified by:
consumerHeartbeatTime
in interfaceRingBuffer
- Returns:
- the time of the last consumer heartbeat.
-
producerPosition
public long producerPosition()The position in bytes from start up of the producers. The figure includes the headers. This is the range they are working with but could still be in the act of working with.- Specified by:
producerPosition
in interfaceRingBuffer
- Returns:
- number of bytes produced by the producers in claimed space.
-
consumerPosition
public long consumerPosition()The position in bytes from start up for the consumers. The figure includes the headers.- Specified by:
consumerPosition
in interfaceRingBuffer
- Returns:
- the count of bytes consumed by the consumers.
-
size
public int size()Size of the buffer backlog in bytes between producers and consumers. The value includes the size of headers.This method gives a concurrent snapshot of the buffer whereby a concurrent read or write may be partially complete and thus the value should be taken as an indication.
- Specified by:
size
in interfaceRingBuffer
- Returns:
- size of the backlog of bytes in the buffer between producers and consumers.
-
unblock
public boolean unblock()Unblock a multi-producer ring buffer when a producer has died during the act of offering. The operation will scan from the consumer position up to the producer position.If no action is required at the position then none will be taken.
- Specified by:
unblock
in interfaceRingBuffer
- Returns:
- true of an unblocking action was taken otherwise false.
-
scanBackToConfirmStillZeroed
-
checkMsgLength
private void checkMsgLength(int length) -
claimCapacity
-
computeRecordIndex
private int computeRecordIndex(int index) -
verifyClaimedSpaceNotReleased
-