Class SubmissionPublisher.BufferedSubscription<T>
- java.lang.Object
-
- org.glassfish.jersey.internal.jsr166.SubmissionPublisher.BufferedSubscription<T>
-
- All Implemented Interfaces:
java.util.concurrent.ForkJoinPool.ManagedBlocker
,Flow.Subscription
- Enclosing class:
- SubmissionPublisher<T>
private static final class SubmissionPublisher.BufferedSubscription<T> extends java.lang.Object implements Flow.Subscription, java.util.concurrent.ForkJoinPool.ManagedBlocker
A bounded (ring) buffer with integrated control to start a consumer task whenever items are available. The buffer algorithm is similar to one used inside ForkJoinPool (see its internal documentation for details) specialized for the case of at most one concurrent producer and consumer, and power of two buffer sizes. This allows methods to operate without locks even while supporting resizing, blocking, task-triggering, and garbage-free buffers (nulling out elements when consumed), although supporting these does impose a bit of overhead compared to plain fixed-size ring buffers.The publisher guarantees a single producer via its lock. We ensure in this class that there is at most one consumer. The request and cancel methods must be fully thread-safe but are coded to exploit the most common case in which they are only called by consumers (usually within onNext).
Execution control is managed using the ACTIVE ctl bit. We ensure that a task is active when consumable items (and usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and there is demand (unfilled requests). This is complicated on the creation side by the possibility of exceptions when trying to execute tasks. These eventually force DISABLED state, but sometimes not directly. On the task side, termination (clearing ACTIVE) that would otherwise race with producers or request() calls uses the CONSUME keep-alive bit to force a recheck.
The ctl field also manages run state. When DISABLED, no further updates are possible. Disabling may be preceded by setting ERROR or COMPLETE (or both -- ERROR has precedence), in which case the associated Subscriber methods are invoked, possibly synchronously if there is no active consumer task (including cases where execute() failed). The cancel() method is supported by treating as ERROR but suppressing onError signal.
Support for blocking also exploits the fact that there is only one possible waiter. ManagedBlocker-compatible control fields are placed in this class itself rather than in wait-nodes. Blocking control relies on the "waiter" field. Producers set the field before trying to block, but must then recheck (via offer) before parking. Signalling then just unparks and clears waiter field. If the producer and/or consumer are using a ForkJoinPool, the producer attempts to help run consumer tasks via ForkJoinPool.helpAsyncBlocker before blocking.
-
-
Field Summary
Fields Modifier and Type Field Description private static int
ABASE
(package private) static int
ACTIVE
(package private) java.lang.Object[]
array
private static int
ASHIFT
(package private) static int
COMPLETE
(package private) static int
CONSUME
(package private) int
ctl
private static long
CTL
(package private) static int
DEFAULT_INITIAL_CAP
Initial buffer capacity used when maxBufferCapacity is greater.(package private) long
demand
private static long
DEMAND
(package private) static int
DISABLED
(package private) static int
ERROR
(package private) java.util.concurrent.Executor
executor
(package private) int
head
private static long
HEAD
(package private) static long
INTERRUPTED
(package private) int
maxCapacity
(package private) SubmissionPublisher.BufferedSubscription<T>
next
(package private) SubmissionPublisher.BufferedSubscription<T>
nextRetry
(package private) java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable>
onNextHandler
(package private) java.lang.Throwable
pendingError
(package private) T
putItem
(package private) int
putStat
(package private) static int
SUBSCRIBE
(package private) Flow.Subscriber<? super T>
subscriber
(package private) int
tail
private static long
TAIL
(package private) long
timeout
private static sun.misc.Unsafe
U
(package private) java.lang.Thread
waiter
-
Constructor Summary
Constructors Constructor Description BufferedSubscription(Flow.Subscriber<? super T> subscriber, java.util.concurrent.Executor executor, java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler, int maxBufferCapacity)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
block()
void
cancel()
Causes consumer task to exit if active (without reporting onError unless there is already a pending error), and disables.private boolean
checkControl(Flow.Subscriber<? super T> s, int c)
Responds to control events in consume().private boolean
checkDemand(int c)
Responds to apparent zero demand in consume().private boolean
checkEmpty(Flow.Subscriber<? super T> s, int c)
Responds to apparent emptiness in consume().(package private) void
consume()
Consumer loop, called from ConsumerTask, or indirectly when helping during submit.private void
detach()
Nulls out most fields, mainly to avoid garbage retention until publisher unsubscribes, but also to help cleanly stop upon error by nulling required components.(package private) int
estimateLag()
Returns estimated number of buffered items, or -1 if disabled.private int
growAndAdd(java.lang.Object[] a, T item)
Tries to create or expand buffer, then adds item if possible.private void
handleOnNext(Flow.Subscriber<? super T> s, java.lang.Throwable ex)
Processes exception in Subscriber.onNext.(package private) boolean
isDisabled()
boolean
isReleasable()
(package private) int
offer(T item)
Tries to add item and start consumer task if necessary.(package private) void
onComplete()
(package private) void
onError(java.lang.Throwable ex)
Issues error signal, asynchronously if a task is running, else synchronously.(package private) void
onSubscribe()
void
request(long n)
Adds to demand and possibly starts task.private void
signalWaiter(java.lang.Thread w)
private int
startOnOffer(int stat)
Tries to start consumer task after offer.private void
startOrDisable()
Tries to start consumer task upon a signal or request; disables on failure.(package private) int
submit(T item)
Spins/helps/blocks while offer returns 0.(package private) int
timedOffer(T item, long nanos)
Timeout version; similar to submit.java.lang.String
toString()
-
-
-
Field Detail
-
timeout
long timeout
-
demand
volatile long demand
-
maxCapacity
int maxCapacity
-
putStat
int putStat
-
ctl
volatile int ctl
-
head
volatile int head
-
tail
int tail
-
array
java.lang.Object[] array
-
subscriber
Flow.Subscriber<? super T> subscriber
-
executor
java.util.concurrent.Executor executor
-
onNextHandler
java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler
-
pendingError
volatile java.lang.Throwable pendingError
-
waiter
volatile java.lang.Thread waiter
-
putItem
T putItem
-
next
SubmissionPublisher.BufferedSubscription<T> next
-
nextRetry
SubmissionPublisher.BufferedSubscription<T> nextRetry
-
ACTIVE
static final int ACTIVE
- See Also:
- Constant Field Values
-
CONSUME
static final int CONSUME
- See Also:
- Constant Field Values
-
DISABLED
static final int DISABLED
- See Also:
- Constant Field Values
-
ERROR
static final int ERROR
- See Also:
- Constant Field Values
-
SUBSCRIBE
static final int SUBSCRIBE
- See Also:
- Constant Field Values
-
COMPLETE
static final int COMPLETE
- See Also:
- Constant Field Values
-
INTERRUPTED
static final long INTERRUPTED
- See Also:
- Constant Field Values
-
DEFAULT_INITIAL_CAP
static final int DEFAULT_INITIAL_CAP
Initial buffer capacity used when maxBufferCapacity is greater. Must be a power of two.- See Also:
- Constant Field Values
-
U
private static final sun.misc.Unsafe U
-
CTL
private static final long CTL
-
TAIL
private static final long TAIL
-
HEAD
private static final long HEAD
-
DEMAND
private static final long DEMAND
-
ABASE
private static final int ABASE
-
ASHIFT
private static final int ASHIFT
-
-
Constructor Detail
-
BufferedSubscription
BufferedSubscription(Flow.Subscriber<? super T> subscriber, java.util.concurrent.Executor executor, java.util.function.BiConsumer<? super Flow.Subscriber<? super T>,? super java.lang.Throwable> onNextHandler, int maxBufferCapacity)
-
-
Method Detail
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
isDisabled
final boolean isDisabled()
-
estimateLag
final int estimateLag()
Returns estimated number of buffered items, or -1 if disabled.
-
offer
final int offer(T item)
Tries to add item and start consumer task if necessary.- Returns:
- -1 if disabled, 0 if dropped, else estimated lag
-
growAndAdd
private int growAndAdd(java.lang.Object[] a, T item)
Tries to create or expand buffer, then adds item if possible.
-
submit
final int submit(T item)
Spins/helps/blocks while offer returns 0. Called only if initial offer return 0.
-
timedOffer
final int timedOffer(T item, long nanos)
Timeout version; similar to submit.
-
startOnOffer
private int startOnOffer(int stat)
Tries to start consumer task after offer.- Returns:
- -1 if now disabled, else argument
-
signalWaiter
private void signalWaiter(java.lang.Thread w)
-
detach
private void detach()
Nulls out most fields, mainly to avoid garbage retention until publisher unsubscribes, but also to help cleanly stop upon error by nulling required components.
-
onError
final void onError(java.lang.Throwable ex)
Issues error signal, asynchronously if a task is running, else synchronously.
-
startOrDisable
private void startOrDisable()
Tries to start consumer task upon a signal or request; disables on failure.
-
onComplete
final void onComplete()
-
onSubscribe
final void onSubscribe()
-
cancel
public void cancel()
Causes consumer task to exit if active (without reporting onError unless there is already a pending error), and disables.- Specified by:
cancel
in interfaceFlow.Subscription
-
request
public void request(long n)
Adds to demand and possibly starts task.- Specified by:
request
in interfaceFlow.Subscription
- Parameters:
n
- the increment of demand; a value ofLong.MAX_VALUE
may be considered as effectively unbounded
-
isReleasable
public final boolean isReleasable()
- Specified by:
isReleasable
in interfacejava.util.concurrent.ForkJoinPool.ManagedBlocker
-
block
public final boolean block()
- Specified by:
block
in interfacejava.util.concurrent.ForkJoinPool.ManagedBlocker
-
consume
final void consume()
Consumer loop, called from ConsumerTask, or indirectly when helping during submit.
-
checkControl
private boolean checkControl(Flow.Subscriber<? super T> s, int c)
Responds to control events in consume().
-
checkEmpty
private boolean checkEmpty(Flow.Subscriber<? super T> s, int c)
Responds to apparent emptiness in consume().
-
checkDemand
private boolean checkDemand(int c)
Responds to apparent zero demand in consume().
-
handleOnNext
private void handleOnNext(Flow.Subscriber<? super T> s, java.lang.Throwable ex)
Processes exception in Subscriber.onNext.
-
-