Package org.agrona
Class ExpandableRingBuffer
- java.lang.Object
-
- org.agrona.ExpandableRingBuffer
-
public class ExpandableRingBuffer extends java.lang.Object
Ring-buffer for storing messages which can expand to accommodate the messages written into it. Message are written and read in a FIFO order with capacity up tomaxCapacity()
. Messages can be iterated via for-each methods without consuming and having the option to begin iteration an offset from the currenthead()
position.Note: This class is not thread safe.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
ExpandableRingBuffer.MessageConsumer
Consumers of messages implement this interface and pass it toconsume(MessageConsumer, int)
.
-
Field Summary
Fields Modifier and Type Field Description private UnsafeBuffer
buffer
private int
capacity
private long
head
static int
HEADER_ALIGNMENT
Alignment in bytes for the beginning of message header.static int
HEADER_LENGTH
Length of encapsulating header.private boolean
isDirect
private int
mask
static int
MAX_CAPACITY
Maximum capacity to which the ring buffer can grow which is 1 GB.private int
maxCapacity
private static int
MESSAGE_LENGTH_OFFSET
private static int
MESSAGE_TYPE_DATA
private static int
MESSAGE_TYPE_OFFSET
private static int
MESSAGE_TYPE_PADDING
private long
tail
-
Constructor Summary
Constructors Constructor Description ExpandableRingBuffer()
Create a new ring buffer which is initially compact and empty, has potential forMAX_CAPACITY
, and using a directByteBuffer
.ExpandableRingBuffer(int initialCapacity, int maxCapacity, boolean isDirect)
Create a new ring buffer providing configuration for initial and max capacity, plus whether it is direct or not.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
append(DirectBuffer srcBuffer, int srcOffset, int srcLength)
Append a message into the ring buffer, expanding the buffer if required.int
capacity()
Current capacity of the ring buffer in bytes.int
consume(ExpandableRingBuffer.MessageConsumer messageConsumer, int messageLimit)
Consume messages up to a limit and pass them to theExpandableRingBuffer.MessageConsumer
.int
forEach(int headOffset, ExpandableRingBuffer.MessageConsumer messageConsumer, int limit)
Iterate encoded contents and pass messages to theExpandableRingBuffer.MessageConsumer
which can stop by returning false.int
forEach(ExpandableRingBuffer.MessageConsumer messageConsumer, int limit)
Iterate encoded contents and pass messages to theExpandableRingBuffer.MessageConsumer
which can stop by returning false.long
head()
Head position in the buffer from which bytes are consumed forward toward thetail()
.boolean
isDirect()
Is theByteBuffer
used for backing storage direct, that is off Java heap, or not.boolean
isEmpty()
Is the ring buffer currently empty.int
maxCapacity()
The maximum capacity to which the buffer can expand.void
reset(int requiredCapacity)
Reset the buffer with a new capacity and empty state.private void
resize(int newMessageLength)
int
size()
Size of the ring buffer currently populated in bytes.long
tail()
Tail position in the buffer at which new bytes are appended.private void
writeMessage(DirectBuffer srcBuffer, int srcOffset, int srcLength)
-
-
-
Field Detail
-
MAX_CAPACITY
public static final int MAX_CAPACITY
Maximum capacity to which the ring buffer can grow which is 1 GB.- See Also:
- Constant Field Values
-
HEADER_ALIGNMENT
public static final int HEADER_ALIGNMENT
Alignment in bytes for the beginning of message header.- See Also:
- Constant Field Values
-
HEADER_LENGTH
public static final int HEADER_LENGTH
Length of encapsulating header.- See Also:
- Constant Field Values
-
MESSAGE_LENGTH_OFFSET
private static final int MESSAGE_LENGTH_OFFSET
- See Also:
- Constant Field Values
-
MESSAGE_TYPE_OFFSET
private static final int MESSAGE_TYPE_OFFSET
- See Also:
- Constant Field Values
-
MESSAGE_TYPE_PADDING
private static final int MESSAGE_TYPE_PADDING
- See Also:
- Constant Field Values
-
MESSAGE_TYPE_DATA
private static final int MESSAGE_TYPE_DATA
- See Also:
- Constant Field Values
-
maxCapacity
private final int maxCapacity
-
capacity
private int capacity
-
mask
private int mask
-
head
private long head
-
tail
private long tail
-
buffer
private final UnsafeBuffer buffer
-
isDirect
private final boolean isDirect
-
-
Constructor Detail
-
ExpandableRingBuffer
public ExpandableRingBuffer()
Create a new ring buffer which is initially compact and empty, has potential forMAX_CAPACITY
, and using a directByteBuffer
.
-
ExpandableRingBuffer
public ExpandableRingBuffer(int initialCapacity, int maxCapacity, boolean isDirect)
Create a new ring buffer providing configuration for initial and max capacity, plus whether it is direct or not.- Parameters:
initialCapacity
- required in the buffer.maxCapacity
- the buffer can expand to.isDirect
- is theByteBuffer
allocated direct or heap based.
-
-
Method Detail
-
isDirect
public boolean isDirect()
Is theByteBuffer
used for backing storage direct, that is off Java heap, or not.- Returns:
- return true if direct
ByteBuffer
or false for heap basedByteBuffer
.
-
maxCapacity
public int maxCapacity()
The maximum capacity to which the buffer can expand.- Returns:
- maximum capacity to which the buffer can expand.
-
capacity
public int capacity()
Current capacity of the ring buffer in bytes.- Returns:
- the current capacity of the ring buffer in bytes.
-
size
public int size()
Size of the ring buffer currently populated in bytes.- Returns:
- size of the ring buffer currently populated in bytes.
-
isEmpty
public boolean isEmpty()
Is the ring buffer currently empty.- Returns:
- true if the ring buffer is empty otherwise false.
-
head
public long head()
Head position in the buffer from which bytes are consumed forward toward thetail()
.- Returns:
- head position in the buffer from which bytes are consumed forward towards the
tail()
. - See Also:
consume(MessageConsumer, int)
-
tail
public long tail()
Tail position in the buffer at which new bytes are appended.- Returns:
- tail position in the buffer at which new bytes are appended.
- See Also:
append(DirectBuffer, int, int)
-
reset
public void reset(int requiredCapacity)
Reset the buffer with a new capacity and empty state. Buffer capacity will grow or shrink as necessary.- Parameters:
requiredCapacity
- for the ring buffer. If the same as exiting capacity then no adjustment is made.
-
forEach
public int forEach(ExpandableRingBuffer.MessageConsumer messageConsumer, int limit)
Iterate encoded contents and pass messages to theExpandableRingBuffer.MessageConsumer
which can stop by returning false.- Parameters:
messageConsumer
- to which the encoded messages are passed.limit
- for the number of entries to iterate over.- Returns:
- count of bytes iterated.
- See Also:
ExpandableRingBuffer.MessageConsumer
-
forEach
public int forEach(int headOffset, ExpandableRingBuffer.MessageConsumer messageConsumer, int limit)
Iterate encoded contents and pass messages to theExpandableRingBuffer.MessageConsumer
which can stop by returning false.- Parameters:
headOffset
- offset fromhead
which must be <=tail()
, and be the start a message.messageConsumer
- to which the encoded messages are passed.limit
- for the number of entries to iterate over.- Returns:
- count of bytes iterated.
- See Also:
ExpandableRingBuffer.MessageConsumer
-
consume
public int consume(ExpandableRingBuffer.MessageConsumer messageConsumer, int messageLimit)
Consume messages up to a limit and pass them to theExpandableRingBuffer.MessageConsumer
.- Parameters:
messageConsumer
- to which the encoded messages are passed.messageLimit
- on the number of messages to consume per read operation.- Returns:
- the number of bytes consumed
- See Also:
ExpandableRingBuffer.MessageConsumer
-
append
public boolean append(DirectBuffer srcBuffer, int srcOffset, int srcLength)
Append a message into the ring buffer, expanding the buffer if required.- Parameters:
srcBuffer
- containing the encoded message.srcOffset
- within the buffer at which the message begins.srcLength
- of the encoded message in the buffer.- Returns:
- true if successful otherwise false if
maxCapacity()
is reached.
-
resize
private void resize(int newMessageLength)
-
writeMessage
private void writeMessage(DirectBuffer srcBuffer, int srcOffset, int srcLength)
-
-