Interface RingBuffer

All Known Implementing Classes:
ManyToOneRingBuffer, OneToOneRingBuffer

public interface RingBuffer
Ring-buffer for the concurrent exchanging of binary encoded messages from producer(s) to consumer(s) in a FIFO manner.
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    static final int
    Buffer has insufficient capacity to record a message or satisfy tryClaim(int, int) request.
    static final int
    Record type is padding to prevent fragmentation in the buffer.
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    abort(int index)
    Abort claim and allow consumer to proceed after the claimed length.
    Get the underlying buffer used by the RingBuffer for storage.
    int
    Get the capacity of the ring-buffer in bytes for exchange.
    void
    commit(int index)
    Commit message that was written in the previously claimed space thus making it available to the consumer.
    long
    The time of the last consumer heartbeat.
    void
    Set the time of the last consumer heartbeat.
    long
    The position in bytes from start up for the consumers.
    int
    Read as many messages as are available to the end of the ring buffer with the handler able to control progress.
    int
    controlledRead(ControlledMessageHandler handler, int messageCountLimit)
    Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.
    int
    The maximum message length in bytes supported by the underlying ring buffer.
    long
    Get the next value that can be used for a correlation id on a message when a response needs to be correlated.
    long
    The position in bytes from start up of the producers.
    int
    Read as many messages as are available to the end of the ring buffer.
    int
    read(MessageHandler handler, int messageCountLimit)
    Read as many messages as are available to end of the ring buffer to up a supplied maximum.
    int
    Size of the buffer backlog in bytes between producers and consumers.
    int
    tryClaim(int msgTypeId, int length)
    Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics.
    boolean
    Unblock a multi-producer ring buffer when a producer has died during the act of offering.
    boolean
    write(int msgTypeId, DirectBuffer srcBuffer, int offset, int length)
    Non-blocking write of a message to an underlying ring-buffer.
  • Field Details

    • PADDING_MSG_TYPE_ID

      static final int PADDING_MSG_TYPE_ID
      Record type is padding to prevent fragmentation in the buffer.
      See Also:
    • INSUFFICIENT_CAPACITY

      static final int INSUFFICIENT_CAPACITY
      Buffer has insufficient capacity to record a message or satisfy tryClaim(int, int) request.
      See Also:
  • Method Details

    • capacity

      int capacity()
      Get the capacity of the ring-buffer in bytes for exchange.
      Returns:
      the capacity of the ring-buffer in bytes for exchange.
    • write

      boolean write(int msgTypeId, DirectBuffer srcBuffer, int offset, int length)
      Non-blocking write of a message to an underlying ring-buffer.
      Parameters:
      msgTypeId - type of the message encoding.
      srcBuffer - containing the encoded binary message.
      offset - at which the encoded message begins.
      length - of the encoded message in bytes.
      Returns:
      true if written to the ring-buffer, or false if insufficient space exists.
      Throws:
      IllegalArgumentException - if the length is negative or is greater than maxMsgLength().
    • tryClaim

      int tryClaim(int msgTypeId, int length)
      Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. Once the message has been written then commit(int) should be called thus making it available to be consumed. Alternatively a claim can be aborted using abort(int) method.

      Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until the claimed space is either committed or aborted. Producers will be able to write message even when outstanding claims exist.

      An example of using tryClaim:

       
           final RingBuffer ringBuffer = ...;
      
           final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
           if (index > 0)
           {
               try
               {
                   final AtomicBuffer buffer = ringBuffer.buffer();
                   // Work with the buffer directly using the index
                   ...
               }
               finally
               {
                   ringBuffer.commit(index); // commit message
               }
           }
       
       

      Ensure that claimed space is released even in case of an exception:

       
           final RingBuffer ringBuffer = ...;
      
           final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
           if (index > 0)
           {
               try
               {
                   final AtomicBuffer buffer = ringBuffer.buffer();
                   // Work with the buffer directly using the index
                   ...
                   ringBuffer.commit(index); // commit message
               }
               catch (final Exception ex)
               {
                   ringBuffer.abort(index); // allow consumer to proceed
                   ...
               }
           }
       
       
      Parameters:
      msgTypeId - type of the message encoding. Will be written into the header upon successful claim.
      length - of the claim in bytes. A claim length cannot be greater than maxMsgLength().
      Returns:
      a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns INSUFFICIENT_CAPACITY indicating that there is not enough free space in the buffer.
      Throws:
      IllegalArgumentException - if the msgTypeId is less than 1.
      IllegalArgumentException - if the length is negative or is greater than maxMsgLength().
      See Also:
    • commit

      void commit(int index)
      Commit message that was written in the previously claimed space thus making it available to the consumer.
      Parameters:
      index - at which the encoded message begins, i.e. value returned from the tryClaim(int, int) call.
      Throws:
      IllegalArgumentException - if the index is out of bounds.
      IllegalStateException - if this method is called after abort(int) or was already invoked for the given index.
      See Also:
    • abort

      void abort(int index)
      Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, i.e. changes type of the message to PADDING_MSG_TYPE_ID.
      Parameters:
      index - at which the encoded message begins, i.e. value returned from the tryClaim(int, int) call.
      Throws:
      IllegalArgumentException - if the index is out of bounds.
      IllegalStateException - if this method is called after commit(int) or was already invoked for the given index.
      See Also:
    • read

      int read(MessageHandler handler)
      Read as many messages as are available to the end of the ring buffer.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Parameters:
      handler - to be called for processing each message in turn.
      Returns:
      the number of messages that have been processed.
    • read

      int read(MessageHandler handler, int messageCountLimit)
      Read as many messages as are available to end of the ring buffer to up a supplied maximum.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Parameters:
      handler - to be called for processing each message in turn.
      messageCountLimit - the number of messages will be read in a single invocation.
      Returns:
      the number of messages that have been processed.
    • controlledRead

      int controlledRead(ControlledMessageHandler handler)
      Read as many messages as are available to the end of the ring buffer with the handler able to control progress.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Parameters:
      handler - to be called for processing each message in turn which will return how to progress.
      Returns:
      the number of messages that have been processed.
    • controlledRead

      int controlledRead(ControlledMessageHandler handler, int messageCountLimit)
      Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Parameters:
      handler - to be called for processing each message in turn which will return how to progress.
      messageCountLimit - the number of messages will be read in a single invocation.
      Returns:
      the number of messages that have been processed.
    • maxMsgLength

      int maxMsgLength()
      The maximum message length in bytes supported by the underlying ring buffer.
      Returns:
      the maximum message length in bytes supported by the underlying ring buffer.
    • nextCorrelationId

      long nextCorrelationId()
      Get the next value that can be used for a correlation id on a message when a response needs to be correlated.

      This method should be thread safe.

      Returns:
      the next value in the correlation sequence.
    • buffer

      AtomicBuffer buffer()
      Get the underlying buffer used by the RingBuffer for storage.
      Returns:
      the underlying buffer used by the RingBuffer for storage.
    • consumerHeartbeatTime

      void consumerHeartbeatTime(long time)
      Set the time of the last consumer heartbeat.

      Note: The value for time must be valid across processes which means System.nanoTime() is not a valid option.

      Parameters:
      time - of the last consumer heartbeat.
    • consumerHeartbeatTime

      long consumerHeartbeatTime()
      The time of the last consumer heartbeat.
      Returns:
      the time of the last consumer heartbeat.
    • producerPosition

      long producerPosition()
      The position in bytes from start up of the producers. The figure includes the headers. This is the range they are working with but could still be in the act of working with.
      Returns:
      number of bytes produced by the producers in claimed space.
    • consumerPosition

      long consumerPosition()
      The position in bytes from start up for the consumers. The figure includes the headers.
      Returns:
      the count of bytes consumed by the consumers.
    • size

      int size()
      Size of the buffer backlog in bytes between producers and consumers. The value includes the size of headers.

      This method gives a concurrent snapshot of the buffer whereby a concurrent read or write may be partially complete and thus the value should be taken as an indication.

      Returns:
      size of the backlog of bytes in the buffer between producers and consumers.
    • unblock

      boolean unblock()
      Unblock a multi-producer ring buffer when a producer has died during the act of offering. The operation will scan from the consumer position up to the producer position.

      If no action is required at the position then none will be taken.

      Returns:
      true of an unblocking action was taken otherwise false.