Interface RingBuffer
- All Known Implementing Classes:
ManyToOneRingBuffer
,OneToOneRingBuffer
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final int
Buffer has insufficient capacity to record a message or satisfytryClaim(int, int)
request.static final int
Record type is padding to prevent fragmentation in the buffer. -
Method Summary
Modifier and TypeMethodDescriptionvoid
abort
(int index) Abort claim and allow consumer to proceed after the claimed length.buffer()
Get the underlying buffer used by the RingBuffer for storage.int
capacity()
Get the capacity of the ring-buffer in bytes for exchange.void
commit
(int index) Commit message that was written in the previously claimed space thus making it available to the consumer.long
The time of the last consumer heartbeat.void
consumerHeartbeatTime
(long time) Set the time of the last consumer heartbeat.long
The position in bytes from start up for the consumers.int
controlledRead
(ControlledMessageHandler handler) Read as many messages as are available to the end of the ring buffer with the handler able to control progress.int
controlledRead
(ControlledMessageHandler handler, int messageCountLimit) Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.int
The maximum message length in bytes supported by the underlying ring buffer.long
Get the next value that can be used for a correlation id on a message when a response needs to be correlated.long
The position in bytes from start up of the producers.int
read
(MessageHandler handler) Read as many messages as are available to the end of the ring buffer.int
read
(MessageHandler handler, int messageCountLimit) Read as many messages as are available to end of the ring buffer to up a supplied maximum.int
size()
Size of the buffer backlog in bytes between producers and consumers.int
tryClaim
(int msgTypeId, int length) Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics.boolean
unblock()
Unblock a multi-producer ring buffer when a producer has died during the act of offering.boolean
write
(int msgTypeId, DirectBuffer srcBuffer, int offset, int length) Non-blocking write of a message to an underlying ring-buffer.
-
Field Details
-
PADDING_MSG_TYPE_ID
static final int PADDING_MSG_TYPE_IDRecord type is padding to prevent fragmentation in the buffer.- See Also:
-
INSUFFICIENT_CAPACITY
static final int INSUFFICIENT_CAPACITYBuffer has insufficient capacity to record a message or satisfytryClaim(int, int)
request.- See Also:
-
-
Method Details
-
capacity
int capacity()Get the capacity of the ring-buffer in bytes for exchange.- Returns:
- the capacity of the ring-buffer in bytes for exchange.
-
write
Non-blocking write of a message to an underlying ring-buffer.- Parameters:
msgTypeId
- type of the message encoding.srcBuffer
- containing the encoded binary message.offset
- at which the encoded message begins.length
- of the encoded message in bytes.- Returns:
- true if written to the ring-buffer, or false if insufficient space exists.
- Throws:
IllegalArgumentException
- if thelength
is negative or is greater thanmaxMsgLength()
.
-
tryClaim
int tryClaim(int msgTypeId, int length) Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. Once the message has been written thencommit(int)
should be called thus making it available to be consumed. Alternatively a claim can be aborted usingabort(int)
method.Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until the claimed space is either committed or aborted. Producers will be able to write message even when outstanding claims exist.
An example of using
tryClaim
:final RingBuffer ringBuffer = ...; final int index = ringBuffer.tryClaim(msgTypeId, messageLength); if (index > 0) { try { final AtomicBuffer buffer = ringBuffer.buffer(); // Work with the buffer directly using the index ... } finally { ringBuffer.commit(index); // commit message } }
Ensure that claimed space is released even in case of an exception:
final RingBuffer ringBuffer = ...; final int index = ringBuffer.tryClaim(msgTypeId, messageLength); if (index > 0) { try { final AtomicBuffer buffer = ringBuffer.buffer(); // Work with the buffer directly using the index ... ringBuffer.commit(index); // commit message } catch (final Exception ex) { ringBuffer.abort(index); // allow consumer to proceed ... } }
- Parameters:
msgTypeId
- type of the message encoding. Will be written into the header upon successful claim.length
- of the claim in bytes. A claim length cannot be greater thanmaxMsgLength()
.- Returns:
- a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns
INSUFFICIENT_CAPACITY
indicating that there is not enough free space in the buffer. - Throws:
IllegalArgumentException
- if themsgTypeId
is less than1
.IllegalArgumentException
- if thelength
is negative or is greater thanmaxMsgLength()
.- See Also:
-
commit
void commit(int index) Commit message that was written in the previously claimed space thus making it available to the consumer.- Parameters:
index
- at which the encoded message begins, i.e. value returned from thetryClaim(int, int)
call.- Throws:
IllegalArgumentException
- if theindex
is out of bounds.IllegalStateException
- if this method is called afterabort(int)
or was already invoked for the givenindex
.- See Also:
-
abort
void abort(int index) Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, i.e. changes type of the message toPADDING_MSG_TYPE_ID
.- Parameters:
index
- at which the encoded message begins, i.e. value returned from thetryClaim(int, int)
call.- Throws:
IllegalArgumentException
- if theindex
is out of bounds.IllegalStateException
- if this method is called aftercommit(int)
or was already invoked for the givenindex
.- See Also:
-
read
Read as many messages as are available to the end of the ring buffer.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Parameters:
handler
- to be called for processing each message in turn.- Returns:
- the number of messages that have been processed.
-
read
Read as many messages as are available to end of the ring buffer to up a supplied maximum.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Parameters:
handler
- to be called for processing each message in turn.messageCountLimit
- the number of messages will be read in a single invocation.- Returns:
- the number of messages that have been processed.
-
controlledRead
Read as many messages as are available to the end of the ring buffer with the handler able to control progress.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Parameters:
handler
- to be called for processing each message in turn which will return how to progress.- Returns:
- the number of messages that have been processed.
-
controlledRead
Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The
size()
method may be called to determine of a backlog of message bytes remains in the ring buffer.- Parameters:
handler
- to be called for processing each message in turn which will return how to progress.messageCountLimit
- the number of messages will be read in a single invocation.- Returns:
- the number of messages that have been processed.
-
maxMsgLength
int maxMsgLength()The maximum message length in bytes supported by the underlying ring buffer.- Returns:
- the maximum message length in bytes supported by the underlying ring buffer.
-
nextCorrelationId
long nextCorrelationId()Get the next value that can be used for a correlation id on a message when a response needs to be correlated.This method should be thread safe.
- Returns:
- the next value in the correlation sequence.
-
buffer
AtomicBuffer buffer()Get the underlying buffer used by the RingBuffer for storage.- Returns:
- the underlying buffer used by the RingBuffer for storage.
-
consumerHeartbeatTime
void consumerHeartbeatTime(long time) Set the time of the last consumer heartbeat.Note: The value for time must be valid across processes which means
System.nanoTime()
is not a valid option.- Parameters:
time
- of the last consumer heartbeat.
-
consumerHeartbeatTime
long consumerHeartbeatTime()The time of the last consumer heartbeat.- Returns:
- the time of the last consumer heartbeat.
-
producerPosition
long producerPosition()The position in bytes from start up of the producers. The figure includes the headers. This is the range they are working with but could still be in the act of working with.- Returns:
- number of bytes produced by the producers in claimed space.
-
consumerPosition
long consumerPosition()The position in bytes from start up for the consumers. The figure includes the headers.- Returns:
- the count of bytes consumed by the consumers.
-
size
int size()Size of the buffer backlog in bytes between producers and consumers. The value includes the size of headers.This method gives a concurrent snapshot of the buffer whereby a concurrent read or write may be partially complete and thus the value should be taken as an indication.
- Returns:
- size of the backlog of bytes in the buffer between producers and consumers.
-
unblock
boolean unblock()Unblock a multi-producer ring buffer when a producer has died during the act of offering. The operation will scan from the consumer position up to the producer position.If no action is required at the position then none will be taken.
- Returns:
- true of an unblocking action was taken otherwise false.
-