Class SubmissionPublisher.BufferedSubscription<T>

  • All Implemented Interfaces:
    java.util.concurrent.ForkJoinPool.ManagedBlocker, Flow.Subscription
    Enclosing class:
    SubmissionPublisher<T>

    private static final class SubmissionPublisher.BufferedSubscription<T>
    extends java.lang.Object
    implements Flow.Subscription, java.util.concurrent.ForkJoinPool.ManagedBlocker
    A bounded (ring) buffer with integrated control to start a consumer task whenever items are available. The buffer algorithm is similar to one used inside ForkJoinPool (see its internal documentation for details) specialized for the case of at most one concurrent producer and consumer, and power of two buffer sizes. This allows methods to operate without locks even while supporting resizing, blocking, task-triggering, and garbage-free buffers (nulling out elements when consumed), although supporting these does impose a bit of overhead compared to plain fixed-size ring buffers.

    The publisher guarantees a single producer via its lock. We ensure in this class that there is at most one consumer. The request and cancel methods must be fully thread-safe but are coded to exploit the most common case in which they are only called by consumers (usually within onNext).

    Execution control is managed using the ACTIVE ctl bit. We ensure that a task is active when consumable items (and usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and there is demand (unfilled requests). This is complicated on the creation side by the possibility of exceptions when trying to execute tasks. These eventually force DISABLED state, but sometimes not directly. On the task side, termination (clearing ACTIVE) that would otherwise race with producers or request() calls uses the CONSUME keep-alive bit to force a recheck.

    The ctl field also manages run state. When DISABLED, no further updates are possible. Disabling may be preceded by setting ERROR or COMPLETE (or both -- ERROR has precedence), in which case the associated Subscriber methods are invoked, possibly synchronously if there is no active consumer task (including cases where execute() failed). The cancel() method is supported by treating as ERROR but suppressing onError signal.

    Support for blocking also exploits the fact that there is only one possible waiter. ManagedBlocker-compatible control fields are placed in this class itself rather than in wait-nodes. Blocking control relies on the "waiter" field. Producers set the field before trying to block, but must then recheck (via offer) before parking. Signalling then just unparks and clears waiter field. If the producer and/or consumer are using a ForkJoinPool, the producer attempts to help run consumer tasks via ForkJoinPool.helpAsyncBlocker before blocking.

    • Constructor Summary

      Constructors 
      Constructor Description
      BufferedSubscription​(Flow.Subscriber<? super T> subscriber, java.util.concurrent.Executor executor, java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,​? super java.lang.Throwable> onNextHandler, int maxBufferCapacity)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      boolean block()  
      void cancel()
      Causes consumer task to exit if active (without reporting onError unless there is already a pending error), and disables.
      private boolean checkControl​(Flow.Subscriber<? super T> s, int c)
      Responds to control events in consume().
      private boolean checkDemand​(int c)
      Responds to apparent zero demand in consume().
      private boolean checkEmpty​(Flow.Subscriber<? super T> s, int c)
      Responds to apparent emptiness in consume().
      (package private) void consume()
      Consumer loop, called from ConsumerTask, or indirectly when helping during submit.
      private void detach()
      Nulls out most fields, mainly to avoid garbage retention until publisher unsubscribes, but also to help cleanly stop upon error by nulling required components.
      (package private) int estimateLag()
      Returns estimated number of buffered items, or -1 if disabled.
      private int growAndAdd​(java.lang.Object[] a, T item)
      Tries to create or expand buffer, then adds item if possible.
      private void handleOnNext​(Flow.Subscriber<? super T> s, java.lang.Throwable ex)
      Processes exception in Subscriber.onNext.
      (package private) boolean isDisabled()  
      boolean isReleasable()  
      (package private) int offer​(T item)
      Tries to add item and start consumer task if necessary.
      (package private) void onComplete()  
      (package private) void onError​(java.lang.Throwable ex)
      Issues error signal, asynchronously if a task is running, else synchronously.
      (package private) void onSubscribe()  
      void request​(long n)
      Adds to demand and possibly starts task.
      private void signalWaiter​(java.lang.Thread w)  
      private int startOnOffer​(int stat)
      Tries to start consumer task after offer.
      private void startOrDisable()
      Tries to start consumer task upon a signal or request; disables on failure.
      (package private) int submit​(T item)
      Spins/helps/blocks while offer returns 0.
      (package private) int timedOffer​(T item, long nanos)
      Timeout version; similar to submit.
      java.lang.String toString()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
    • Field Detail

      • timeout

        long timeout
      • demand

        volatile long demand
      • maxCapacity

        int maxCapacity
      • putStat

        int putStat
      • ctl

        volatile int ctl
      • head

        volatile int head
      • tail

        int tail
      • array

        java.lang.Object[] array
      • executor

        java.util.concurrent.Executor executor
      • onNextHandler

        java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,​? super java.lang.Throwable> onNextHandler
      • pendingError

        volatile java.lang.Throwable pendingError
      • waiter

        volatile java.lang.Thread waiter
      • putItem

        T putItem
      • DEFAULT_INITIAL_CAP

        static final int DEFAULT_INITIAL_CAP
        Initial buffer capacity used when maxBufferCapacity is greater. Must be a power of two.
        See Also:
        Constant Field Values
      • U

        private static final sun.misc.Unsafe U
      • CTL

        private static final long CTL
      • TAIL

        private static final long TAIL
      • HEAD

        private static final long HEAD
      • DEMAND

        private static final long DEMAND
      • ABASE

        private static final int ABASE
      • ASHIFT

        private static final int ASHIFT
    • Constructor Detail

      • BufferedSubscription

        BufferedSubscription​(Flow.Subscriber<? super T> subscriber,
                             java.util.concurrent.Executor executor,
                             java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,​? super java.lang.Throwable> onNextHandler,
                             int maxBufferCapacity)
    • Method Detail

      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • isDisabled

        final boolean isDisabled()
      • estimateLag

        final int estimateLag()
        Returns estimated number of buffered items, or -1 if disabled.
      • offer

        final int offer​(T item)
        Tries to add item and start consumer task if necessary.
        Returns:
        -1 if disabled, 0 if dropped, else estimated lag
      • growAndAdd

        private int growAndAdd​(java.lang.Object[] a,
                               T item)
        Tries to create or expand buffer, then adds item if possible.
      • submit

        final int submit​(T item)
        Spins/helps/blocks while offer returns 0. Called only if initial offer return 0.
      • timedOffer

        final int timedOffer​(T item,
                             long nanos)
        Timeout version; similar to submit.
      • startOnOffer

        private int startOnOffer​(int stat)
        Tries to start consumer task after offer.
        Returns:
        -1 if now disabled, else argument
      • signalWaiter

        private void signalWaiter​(java.lang.Thread w)
      • detach

        private void detach()
        Nulls out most fields, mainly to avoid garbage retention until publisher unsubscribes, but also to help cleanly stop upon error by nulling required components.
      • onError

        final void onError​(java.lang.Throwable ex)
        Issues error signal, asynchronously if a task is running, else synchronously.
      • startOrDisable

        private void startOrDisable()
        Tries to start consumer task upon a signal or request; disables on failure.
      • onComplete

        final void onComplete()
      • onSubscribe

        final void onSubscribe()
      • cancel

        public void cancel()
        Causes consumer task to exit if active (without reporting onError unless there is already a pending error), and disables.
        Specified by:
        cancel in interface Flow.Subscription
      • request

        public void request​(long n)
        Adds to demand and possibly starts task.
        Specified by:
        request in interface Flow.Subscription
        Parameters:
        n - the increment of demand; a value of Long.MAX_VALUE may be considered as effectively unbounded
      • isReleasable

        public final boolean isReleasable()
        Specified by:
        isReleasable in interface java.util.concurrent.ForkJoinPool.ManagedBlocker
      • block

        public final boolean block()
        Specified by:
        block in interface java.util.concurrent.ForkJoinPool.ManagedBlocker
      • consume

        final void consume()
        Consumer loop, called from ConsumerTask, or indirectly when helping during submit.
      • checkControl

        private boolean checkControl​(Flow.Subscriber<? super T> s,
                                     int c)
        Responds to control events in consume().
      • checkEmpty

        private boolean checkEmpty​(Flow.Subscriber<? super T> s,
                                   int c)
        Responds to apparent emptiness in consume().
      • checkDemand

        private boolean checkDemand​(int c)
        Responds to apparent zero demand in consume().
      • handleOnNext

        private void handleOnNext​(Flow.Subscriber<? super T> s,
                                  java.lang.Throwable ex)
        Processes exception in Subscriber.onNext.