Class OneToOneRingBuffer

java.lang.Object
org.agrona.concurrent.ringbuffer.OneToOneRingBuffer
All Implemented Interfaces:
RingBuffer

public final class OneToOneRingBuffer extends Object implements RingBuffer
A ring-buffer that supports the exchange of messages from a single producer to a single consumer.

This single producer ring-buffer can be used in combination a consumer using the ManyToOneRingBuffer provided no other producers are accessing the same ring-buffer.

  • Field Details

    • MIN_CAPACITY

      public static final int MIN_CAPACITY
      Minimal required capacity of the ring buffer excluding RingBufferDescriptor.TRAILER_LENGTH.
      See Also:
    • capacity

      private final int capacity
    • maxMsgLength

      private final int maxMsgLength
    • tailPositionIndex

      private final int tailPositionIndex
    • headCachePositionIndex

      private final int headCachePositionIndex
    • headPositionIndex

      private final int headPositionIndex
    • correlationIdCounterIndex

      private final int correlationIdCounterIndex
    • consumerHeartbeatIndex

      private final int consumerHeartbeatIndex
    • buffer

      private final AtomicBuffer buffer
  • Constructor Details

  • Method Details

    • capacity

      public int capacity()
      Get the capacity of the ring-buffer in bytes for exchange.
      Specified by:
      capacity in interface RingBuffer
      Returns:
      the capacity of the ring-buffer in bytes for exchange.
    • write

      public boolean write(int msgTypeId, DirectBuffer srcBuffer, int offset, int length)
      Non-blocking write of a message to an underlying ring-buffer.
      Specified by:
      write in interface RingBuffer
      Parameters:
      msgTypeId - type of the message encoding.
      srcBuffer - containing the encoded binary message.
      offset - at which the encoded message begins.
      length - of the encoded message in bytes.
      Returns:
      true if written to the ring-buffer, or false if insufficient space exists.
    • tryClaim

      public int tryClaim(int msgTypeId, int length)
      Try to claim a space in the underlying ring-buffer into which a message can be written with zero copy semantics. Once the message has been written then RingBuffer.commit(int) should be called thus making it available to be consumed. Alternatively a claim can be aborted using RingBuffer.abort(int) method.

      Claiming a space in the ring-buffer means that the consumer will not be able to consume past the claim until the claimed space is either committed or aborted. Producers will be able to write message even when outstanding claims exist.

      An example of using tryClaim:

       
           final RingBuffer ringBuffer = ...;
      
           final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
           if (index > 0)
           {
               try
               {
                   final AtomicBuffer buffer = ringBuffer.buffer();
                   // Work with the buffer directly using the index
                   ...
               }
               finally
               {
                   ringBuffer.commit(index); // commit message
               }
           }
       
       

      Ensure that claimed space is released even in case of an exception:

       
           final RingBuffer ringBuffer = ...;
      
           final int index = ringBuffer.tryClaim(msgTypeId, messageLength);
           if (index > 0)
           {
               try
               {
                   final AtomicBuffer buffer = ringBuffer.buffer();
                   // Work with the buffer directly using the index
                   ...
                   ringBuffer.commit(index); // commit message
               }
               catch (final Exception ex)
               {
                   ringBuffer.abort(index); // allow consumer to proceed
                   ...
               }
           }
       
       
      Specified by:
      tryClaim in interface RingBuffer
      Parameters:
      msgTypeId - type of the message encoding. Will be written into the header upon successful claim.
      length - of the claim in bytes. A claim length cannot be greater than RingBuffer.maxMsgLength().
      Returns:
      a non-zero index into the underlying ring-buffer at which encoded message begins, otherwise returns RingBuffer.INSUFFICIENT_CAPACITY indicating that there is not enough free space in the buffer.
      See Also:
    • commit

      public void commit(int index)
      Commit message that was written in the previously claimed space thus making it available to the consumer.
      Specified by:
      commit in interface RingBuffer
      Parameters:
      index - at which the encoded message begins, i.e. value returned from the RingBuffer.tryClaim(int, int) call.
      See Also:
    • abort

      public void abort(int index)
      Abort claim and allow consumer to proceed after the claimed length. Aborting turns unused space into padding, i.e. changes type of the message to RingBuffer.PADDING_MSG_TYPE_ID.
      Specified by:
      abort in interface RingBuffer
      Parameters:
      index - at which the encoded message begins, i.e. value returned from the RingBuffer.tryClaim(int, int) call.
      See Also:
    • read

      public int read(MessageHandler handler)
      Read as many messages as are available to the end of the ring buffer.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The RingBuffer.size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Specified by:
      read in interface RingBuffer
      Parameters:
      handler - to be called for processing each message in turn.
      Returns:
      the number of messages that have been processed.
    • read

      public int read(MessageHandler handler, int messageCountLimit)
      Read as many messages as are available to end of the ring buffer to up a supplied maximum.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The RingBuffer.size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Specified by:
      read in interface RingBuffer
      Parameters:
      handler - to be called for processing each message in turn.
      messageCountLimit - the number of messages will be read in a single invocation.
      Returns:
      the number of messages that have been processed.
    • controlledRead

      public int controlledRead(ControlledMessageHandler handler)
      Read as many messages as are available to the end of the ring buffer with the handler able to control progress.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The RingBuffer.size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Specified by:
      controlledRead in interface RingBuffer
      Parameters:
      handler - to be called for processing each message in turn which will return how to progress.
      Returns:
      the number of messages that have been processed.
    • controlledRead

      public int controlledRead(ControlledMessageHandler handler, int messageCountLimit)
      Read messages up to a limit of available to the end of the ring buffer with the handler able to control progress.

      If the ring buffer wraps or encounters a type of record, such a padding record, then an implementation may choose to return and expect the caller to try again. The RingBuffer.size() method may be called to determine of a backlog of message bytes remains in the ring buffer.

      Specified by:
      controlledRead in interface RingBuffer
      Parameters:
      handler - to be called for processing each message in turn which will return how to progress.
      messageCountLimit - the number of messages will be read in a single invocation.
      Returns:
      the number of messages that have been processed.
    • maxMsgLength

      public int maxMsgLength()
      The maximum message length in bytes supported by the underlying ring buffer.
      Specified by:
      maxMsgLength in interface RingBuffer
      Returns:
      the maximum message length in bytes supported by the underlying ring buffer.
    • nextCorrelationId

      public long nextCorrelationId()
      Get the next value that can be used for a correlation id on a message when a response needs to be correlated.

      This method should be thread safe.

      Specified by:
      nextCorrelationId in interface RingBuffer
      Returns:
      the next value in the correlation sequence.
    • buffer

      public AtomicBuffer buffer()
      Get the underlying buffer used by the RingBuffer for storage.
      Specified by:
      buffer in interface RingBuffer
      Returns:
      the underlying buffer used by the RingBuffer for storage.
    • consumerHeartbeatTime

      public void consumerHeartbeatTime(long time)
      Set the time of the last consumer heartbeat.

      Note: The value for time must be valid across processes which means System.nanoTime() is not a valid option.

      Specified by:
      consumerHeartbeatTime in interface RingBuffer
      Parameters:
      time - of the last consumer heartbeat.
    • consumerHeartbeatTime

      public long consumerHeartbeatTime()
      The time of the last consumer heartbeat.
      Specified by:
      consumerHeartbeatTime in interface RingBuffer
      Returns:
      the time of the last consumer heartbeat.
    • producerPosition

      public long producerPosition()
      The position in bytes from start up of the producers. The figure includes the headers. This is the range they are working with but could still be in the act of working with.
      Specified by:
      producerPosition in interface RingBuffer
      Returns:
      number of bytes produced by the producers in claimed space.
    • consumerPosition

      public long consumerPosition()
      The position in bytes from start up for the consumers. The figure includes the headers.
      Specified by:
      consumerPosition in interface RingBuffer
      Returns:
      the count of bytes consumed by the consumers.
    • size

      public int size()
      Size of the buffer backlog in bytes between producers and consumers. The value includes the size of headers.

      This method gives a concurrent snapshot of the buffer whereby a concurrent read or write may be partially complete and thus the value should be taken as an indication.

      Specified by:
      size in interface RingBuffer
      Returns:
      size of the backlog of bytes in the buffer between producers and consumers.
    • unblock

      public boolean unblock()
      Unblock a multi-producer ring buffer when a producer has died during the act of offering. The operation will scan from the consumer position up to the producer position.

      If no action is required at the position then none will be taken.

      Specified by:
      unblock in interface RingBuffer
      Returns:
      true of an unblocking action was taken otherwise false.
    • checkMsgLength

      private void checkMsgLength(int length)
    • claimCapacity

      private int claimCapacity(AtomicBuffer buffer, int recordLength)
    • computeRecordIndex

      private int computeRecordIndex(int index)
    • verifyClaimedSpaceNotReleased

      private int verifyClaimedSpaceNotReleased(AtomicBuffer buffer, int recordIndex)