Class ParallelFlowable<T>
- java.lang.Object
-
- io.reactivex.rxjava3.parallel.ParallelFlowable<T>
-
- Type Parameters:
T
- the value type
- Direct Known Subclasses:
ParallelCollect
,ParallelConcatMap
,ParallelDoOnNextTry
,ParallelFilter
,ParallelFilterTry
,ParallelFlatMap
,ParallelFlatMapIterable
,ParallelFlatMapStream
,ParallelFromArray
,ParallelFromPublisher
,ParallelMap
,ParallelMapOptional
,ParallelMapTry
,ParallelMapTryOptional
,ParallelPeek
,ParallelReduce
,ParallelRunOn
public abstract class ParallelFlowable<@NonNull T> extends java.lang.Object
Abstract base class for parallel publishing of events signaled to an array ofSubscriber
s.Use
from(Publisher)
to start processing a regularPublisher
in 'rails'. UserunOn(Scheduler)
to introduce where each 'rail' should run on thread-vise. Usesequential()
to merge the sources back into a singleFlowable
.History: 2.0.5 - experimental; 2.1 - beta
- Since:
- 2.2
-
-
Constructor Summary
Constructors Constructor Description ParallelFlowable()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description <@NonNull C>
@NonNull ParallelFlowable<C>collect(@NonNull Supplier<? extends @NonNull C> collectionSupplier, @NonNull BiConsumer<? super @NonNull C,? super @NonNull T> collector)
Collect the elements in each rail into a collection supplied via acollectionSupplier
and collected into with a collector action, emitting the collection at the end.<@NonNull A,@NonNull R>
@NonNull Flowable<R>collect(@NonNull java.util.stream.Collector<@NonNull T,@NonNull A,@NonNull R> collector)
Reduces all values within a 'rail' and across 'rails' with a callbacks of the givenCollector
into oneFlowable
containing a single value.<@NonNull U>
@NonNull ParallelFlowable<U>compose(@NonNull ParallelTransformer<@NonNull T,@NonNull U> composer)
Allows composing operators, in assembly time, on top of thisParallelFlowable
and returns anotherParallelFlowable
with composed features.<@NonNull R>
@NonNull ParallelFlowable<R>concatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper)
Generates and concatenatesPublisher
s on each 'rail', signalling errors immediately and generating 2 publishers upfront.<@NonNull R>
@NonNull ParallelFlowable<R>concatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, int prefetch)
Generates and concatenatesPublisher
s on each 'rail', signalling errors immediately and using the given prefetch amount for generatingPublisher
s upfront.<@NonNull R>
@NonNull ParallelFlowable<R>concatMapDelayError(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean tillTheEnd)
Generates and concatenatesPublisher
s on each 'rail', optionally delaying errors and generating 2 publishers upfront.<@NonNull R>
@NonNull ParallelFlowable<R>concatMapDelayError(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, int prefetch, boolean tillTheEnd)
Generates and concatenatesPublisher
s on each 'rail', optionally delaying errors and using the given prefetch amount for generatingPublisher
s upfront.@NonNull ParallelFlowable<T>
doAfterNext(@NonNull Consumer<? super @NonNull T> onAfterNext)
Call the specified consumer with the current element passing through any 'rail' after it has been delivered to downstream within the rail.@NonNull ParallelFlowable<T>
doAfterTerminated(@NonNull Action onAfterTerminate)
Run the specifiedAction
when a 'rail' completes or signals an error.@NonNull ParallelFlowable<T>
doOnCancel(@NonNull Action onCancel)
Run the specifiedAction
when a 'rail' receives a cancellation.@NonNull ParallelFlowable<T>
doOnComplete(@NonNull Action onComplete)
Run the specifiedAction
when a 'rail' completes.@NonNull ParallelFlowable<T>
doOnError(@NonNull Consumer<? super java.lang.Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.@NonNull ParallelFlowable<T>
doOnNext(@NonNull Consumer<? super @NonNull T> onNext)
Call the specified consumer with the current element passing through any 'rail'.@NonNull ParallelFlowable<T>
doOnNext(@NonNull Consumer<? super @NonNull T> onNext, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Call the specified consumer with the current element passing through any 'rail' and handles errors based on the returned value by the handler function.@NonNull ParallelFlowable<T>
doOnNext(@NonNull Consumer<? super @NonNull T> onNext, @NonNull ParallelFailureHandling errorHandler)
Call the specified consumer with the current element passing through any 'rail' and handles errors based on the givenParallelFailureHandling
enumeration value.@NonNull ParallelFlowable<T>
doOnRequest(@NonNull LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a request.@NonNull ParallelFlowable<T>
doOnSubscribe(@NonNull Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Call the specified callback when a 'rail' receives aSubscription
from its upstream.@NonNull ParallelFlowable<T>
filter(@NonNull Predicate<? super @NonNull T> predicate)
Filters the source values on each 'rail'.@NonNull ParallelFlowable<T>
filter(@NonNull Predicate<? super @NonNull T> predicate, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Filters the source values on each 'rail' and handles errors based on the returned value by the handler function.@NonNull ParallelFlowable<T>
filter(@NonNull Predicate<? super @NonNull T> predicate, @NonNull ParallelFailureHandling errorHandler)
Filters the source values on each 'rail' and handles errors based on the givenParallelFailureHandling
enumeration value.<@NonNull R>
@NonNull ParallelFlowable<R>flatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper)
Generates and flattensPublisher
s on each 'rail'.<@NonNull R>
@NonNull ParallelFlowable<R>flatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors.<@NonNull R>
@NonNull ParallelFlowable<R>flatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError, int maxConcurrency)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors and having a total number of simultaneous subscriptions to the innerPublisher
s.<@NonNull R>
@NonNull ParallelFlowable<R>flatMap(@NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors, having a total number of simultaneous subscriptions to the innerPublisher
s and using the given prefetch amount for the innerPublisher
s.<@NonNull U>
@NonNull ParallelFlowable<U>flatMapIterable(@NonNull Function<? super @NonNull T,? extends java.lang.Iterable<? extends @NonNull U>> mapper)
Returns aParallelFlowable
that merges each item emitted by the source on each rail with the values in anIterable
corresponding to that item that is generated by a selector.<@NonNull U>
@NonNull ParallelFlowable<U>flatMapIterable(@NonNull Function<? super @NonNull T,? extends java.lang.Iterable<? extends @NonNull U>> mapper, int bufferSize)
Returns aParallelFlowable
that merges each item emitted by the sourceParallelFlowable
with the values in anIterable
corresponding to that item that is generated by a selector.<@NonNull R>
@NonNull ParallelFlowable<R>flatMapStream(@NonNull Function<? super @NonNull T,? extends java.util.stream.Stream<? extends @NonNull R>> mapper)
Maps each upstream item on each rail into aStream
and emits theStream
's items to the downstream in a sequential fashion.<@NonNull R>
@NonNull ParallelFlowable<R>flatMapStream(@NonNull Function<? super @NonNull T,? extends java.util.stream.Stream<? extends @NonNull R>> mapper, int prefetch)
Maps each upstream item of each rail into aStream
and emits theStream
's items to the downstream in a sequential fashion.static <@NonNull T>
@NonNull ParallelFlowable<T>from(@NonNull org.reactivestreams.Publisher<? extends @NonNull T> source)
Take aPublisher
and prepare to consume it on multiple 'rails' (number of CPUs) in a round-robin fashion.static <@NonNull T>
@NonNull ParallelFlowable<T>from(@NonNull org.reactivestreams.Publisher<? extends @NonNull T> source, int parallelism)
Take aPublisher
and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.static <@NonNull T>
@NonNull ParallelFlowable<T>from(@NonNull org.reactivestreams.Publisher<? extends @NonNull T> source, int parallelism, int prefetch)
Take aPublisher
and prepare to consume it on parallelism number of 'rails' , possibly ordered and round-robin fashion and use custom prefetch amount and queue for dealing with the sourcePublisher
's values.static <@NonNull T>
@NonNull ParallelFlowable<T>fromArray(@NonNull org.reactivestreams.Publisher<@NonNull T>... publishers)
Wraps multiplePublisher
s into aParallelFlowable
which runs them in parallel and unordered.<@NonNull R>
@NonNull ParallelFlowable<R>map(@NonNull Function<? super @NonNull T,? extends @NonNull R> mapper)
Maps the source values on each 'rail' to another value.<@NonNull R>
@NonNull ParallelFlowable<R>map(@NonNull Function<? super @NonNull T,? extends @NonNull R> mapper, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to another value and handles errors based on the returned value by the handler function.<@NonNull R>
@NonNull ParallelFlowable<R>map(@NonNull Function<? super @NonNull T,? extends @NonNull R> mapper, @NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to another value and handles errors based on the givenParallelFailureHandling
enumeration value.<@NonNull R>
@NonNull ParallelFlowable<R>mapOptional(@NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper)
Maps the source values on each 'rail' to an optional and emits its value if any.<@NonNull R>
@NonNull ParallelFlowable<R>mapOptional(@NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper, @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and handles errors based on the returned value by the handler function.<@NonNull R>
@NonNull ParallelFlowable<R>mapOptional(@NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper, @NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and handles errors based on the givenParallelFailureHandling
enumeration value.abstract int
parallelism()
Returns the number of expected parallelSubscriber
s.@NonNull Flowable<T>
reduce(@NonNull BiFunction<@NonNull T,@NonNull T,@NonNull T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into oneFlowable
sequence.<@NonNull R>
@NonNull ParallelFlowable<R>reduce(@NonNull Supplier<@NonNull R> initialSupplier, @NonNull BiFunction<@NonNull R,? super @NonNull T,@NonNull R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different type) via a reducer function that is initialized on each rail from aninitialSupplier
value.@NonNull ParallelFlowable<T>
runOn(@NonNull Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values, specified via aScheduler
, with no work-stealing and default prefetch amount.@NonNull ParallelFlowable<T>
runOn(@NonNull Scheduler scheduler, int prefetch)
Specifies where each 'rail' will observe its incoming values, specified via aScheduler
, with possibly work-stealing and a given prefetch amount.@NonNull Flowable<T>
sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a default prefetch value for the rails.@NonNull Flowable<T>
sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a give prefetch value for the rails.@NonNull Flowable<T>
sequentialDelayError()
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a default prefetch value for the rails and delaying errors from all rails till all terminate.@NonNull Flowable<T>
sequentialDelayError(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a give prefetch value for the rails and delaying errors from all rails till all terminate.@NonNull Flowable<T>
sorted(@NonNull java.util.Comparator<? super @NonNull T> comparator)
Sorts the 'rails' of thisParallelFlowable
and returns aFlowable
that sequentially picks the smallest next value from the rails.@NonNull Flowable<T>
sorted(@NonNull java.util.Comparator<? super @NonNull T> comparator, int capacityHint)
Sorts the 'rails' of thisParallelFlowable
and returns aFlowable
that sequentially picks the smallest next value from the rails.abstract void
subscribe(@NonNull org.reactivestreams.Subscriber<? super @NonNull T>[] subscribers)
Subscribes an array ofSubscriber
s to thisParallelFlowable
and triggers the execution chain for all 'rails'.<@NonNull R>
Rto(@NonNull ParallelFlowableConverter<@NonNull T,@NonNull R> converter)
Calls the specified converter function during assembly time and returns its resulting value.@NonNull Flowable<java.util.List<T>>
toSortedList(@NonNull java.util.Comparator<? super @NonNull T> comparator)
Sorts the 'rails' according to the comparator and returns a full sortedList
as aFlowable
.@NonNull Flowable<@NonNull java.util.List<T>>
toSortedList(@NonNull java.util.Comparator<? super @NonNull T> comparator, int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sortedList
as aFlowable
.protected boolean
validate(@NonNull org.reactivestreams.Subscriber<?>[] subscribers)
Validates the number of subscribers and returnstrue
if their number matches the parallelism level of thisParallelFlowable
.
-
-
-
Method Detail
-
subscribe
@BackpressureSupport(SPECIAL) @SchedulerSupport("none") public abstract void subscribe(@NonNull @NonNull org.reactivestreams.Subscriber<? super @NonNull T>[] subscribers)
Subscribes an array ofSubscriber
s to thisParallelFlowable
and triggers the execution chain for all 'rails'.- Backpressure:
- The backpressure behavior/expectation is determined by the supplied
Subscriber
. - Scheduler:
subscribe
does not operate by default on a particularScheduler
.
- Parameters:
subscribers
- the subscribers array to run in parallel, the number of items must be equal to the parallelism level of thisParallelFlowable
- Throws:
java.lang.NullPointerException
- ifsubscribers
isnull
- See Also:
parallelism()
-
parallelism
@CheckReturnValue public abstract int parallelism()
Returns the number of expected parallelSubscriber
s.- Returns:
- the number of expected parallel
Subscriber
s
-
validate
protected final boolean validate(@NonNull @NonNull org.reactivestreams.Subscriber<?>[] subscribers)
Validates the number of subscribers and returnstrue
if their number matches the parallelism level of thisParallelFlowable
.- Parameters:
subscribers
- the array ofSubscriber
s- Returns:
true
if the number of subscribers equals to the parallelism level- Throws:
java.lang.NullPointerException
- ifsubscribers
isnull
java.lang.IllegalArgumentException
- ifsubscribers.length
is different fromparallelism()
-
from
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(FULL) public static <@NonNull T> @NonNull ParallelFlowable<T> from(@NonNull @NonNull org.reactivestreams.Publisher<? extends @NonNull T> source)
Take aPublisher
and prepare to consume it on multiple 'rails' (number of CPUs) in a round-robin fashion.- Backpressure:
- The operator honors the backpressure of the parallel rails and
requests
Flowable.bufferSize()
amount from the upstream, followed by 75% of that amount requested after every 75% received. - Scheduler:
from
does not operate by default on a particularScheduler
.
- Type Parameters:
T
- the value type- Parameters:
source
- the sourcePublisher
- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifsource
isnull
-
from
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(FULL) public static <@NonNull T> @NonNull ParallelFlowable<T> from(@NonNull @NonNull org.reactivestreams.Publisher<? extends @NonNull T> source, int parallelism)
Take aPublisher
and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.- Backpressure:
- The operator honors the backpressure of the parallel rails and
requests
Flowable.bufferSize()
amount from the upstream, followed by 75% of that amount requested after every 75% received. - Scheduler:
from
does not operate by default on a particularScheduler
.
- Type Parameters:
T
- the value type- Parameters:
source
- the sourcePublisher
parallelism
- the number of parallel rails- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifsource
isnull
java.lang.IllegalArgumentException
- ifparallelism
is non-positive
-
from
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(FULL) public static <@NonNull T> @NonNull ParallelFlowable<T> from(@NonNull @NonNull org.reactivestreams.Publisher<? extends @NonNull T> source, int parallelism, int prefetch)
Take aPublisher
and prepare to consume it on parallelism number of 'rails' , possibly ordered and round-robin fashion and use custom prefetch amount and queue for dealing with the sourcePublisher
's values.- Backpressure:
- The operator honors the backpressure of the parallel rails and
requests the
prefetch
amount from the upstream, followed by 75% of that amount requested after every 75% received. - Scheduler:
from
does not operate by default on a particularScheduler
.
- Type Parameters:
T
- the value type- Parameters:
source
- the sourcePublisher
parallelism
- the number of parallel railsprefetch
- the number of values to prefetch from the source the source until there is a rail ready to process it.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifsource
isnull
java.lang.IllegalArgumentException
- ifparallelism
orprefetch
is non-positive
-
map
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> map(@NonNull @NonNull Function<? super @NonNull T,? extends @NonNull R> mapper)
Maps the source values on each 'rail' to another value.Note that the same
mapper
function may be called from multiple threads concurrently.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into Rs.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
-
map
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> map(@NonNull @NonNull Function<? super @NonNull T,? extends @NonNull R> mapper, @NonNull @NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to another value and handles errors based on the givenParallelFailureHandling
enumeration value.Note that the same
mapper
function may be called from multiple threads concurrently.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into Rs.errorHandler
- the enumeration that defines how to handle errors thrown from themapper
function- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
orerrorHandler
isnull
- Since:
- 2.2
-
map
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> map(@NonNull @NonNull Function<? super @NonNull T,? extends @NonNull R> mapper, @NonNull @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to another value and handles errors based on the returned value by the handler function.Note that the same
mapper
function may be called from multiple threads concurrently.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into Rs.errorHandler
- the function called with the current repeat count and failureThrowable
and should return one of theParallelFailureHandling
enumeration values to indicate how to proceed.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
orerrorHandler
isnull
- Since:
- 2.2
-
filter
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull @NonNull Predicate<? super @NonNull T> predicate)
Filters the source values on each 'rail'.Note that the same predicate may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
filter
does not operate by default on a particularScheduler
.
- Parameters:
predicate
- the function returningtrue
to keep a value orfalse
to drop a value- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifpredicate
isnull
-
filter
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull @NonNull Predicate<? super @NonNull T> predicate, @NonNull @NonNull ParallelFailureHandling errorHandler)
Filters the source values on each 'rail' and handles errors based on the givenParallelFailureHandling
enumeration value.Note that the same predicate may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
filter
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Parameters:
predicate
- the function returningtrue
to keep a value orfalse
to drop a valueerrorHandler
- the enumeration that defines how to handle errors thrown from thepredicate
- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifpredicate
orerrorHandler
isnull
- Since:
- 2.2
-
filter
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final @NonNull ParallelFlowable<T> filter(@NonNull @NonNull Predicate<? super @NonNull T> predicate, @NonNull @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Filters the source values on each 'rail' and handles errors based on the returned value by the handler function.Note that the same predicate may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Parameters:
predicate
- the function returningtrue
to keep a value orfalse
to drop a valueerrorHandler
- the function called with the current repeat count and failureThrowable
and should return one of theParallelFailureHandling
enumeration values to indicate how to proceed.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifpredicate
orerrorHandler
isnull
- Since:
- 2.2
-
runOn
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("custom") public final @NonNull ParallelFlowable<T> runOn(@NonNull @NonNull Scheduler scheduler)
Specifies where each 'rail' will observe its incoming values, specified via aScheduler
, with no work-stealing and default prefetch amount.This operator uses the default prefetch size returned by
Flowable.bufferSize()
.The operator will call
Scheduler.createWorker()
as many times as thisParallelFlowable
's parallelism level is.No assumptions are made about the
Scheduler
's parallelism level, if theScheduler
's parallelism level is lower than theParallelFlowable
's, some rails may end up on the same thread/worker.This operator doesn't require the
Scheduler
to be trampolining as it does its own built-in trampolining logic.- Backpressure:
- The operator honors the backpressure of the parallel rails and
requests
Flowable.bufferSize()
amount from the upstream, followed by 75% of that amount requested after every 75% received. - Scheduler:
runOn
drains the upstream rails on the specifiedScheduler
'sWorker
s.
- Parameters:
scheduler
- the scheduler to use- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifscheduler
isnull
-
runOn
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("custom") public final @NonNull ParallelFlowable<T> runOn(@NonNull @NonNull Scheduler scheduler, int prefetch)
Specifies where each 'rail' will observe its incoming values, specified via aScheduler
, with possibly work-stealing and a given prefetch amount.This operator uses the default prefetch size returned by
Flowable.bufferSize()
.The operator will call
Scheduler.createWorker()
as many times as thisParallelFlowable
's parallelism level is.No assumptions are made about the
Scheduler
's parallelism level, if theScheduler
's parallelism level is lower than theParallelFlowable
's, some rails may end up on the same thread/worker.This operator doesn't require the
Scheduler
to be trampolining as it does its own built-in trampolining logic.- Backpressure:
- The operator honors the backpressure of the parallel rails and
requests the
prefetch
amount from the upstream, followed by 75% of that amount requested after every 75% received. - Scheduler:
runOn
drains the upstream rails on the specifiedScheduler
'sWorker
s.
- Parameters:
scheduler
- the scheduler to use that rail's worker has run out of work.prefetch
- the number of values to request on each 'rail' from the source- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifscheduler
isnull
java.lang.IllegalArgumentException
- ifprefetch
is non-positive
-
reduce
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final @NonNull Flowable<T> reduce(@NonNull @NonNull BiFunction<@NonNull T,@NonNull T,@NonNull T> reducer)
Reduces all values within a 'rail' and across 'rails' with a reducer function into oneFlowable
sequence.Note that the same reducer function may be called from multiple threads concurrently.
- Backpressure:
- The operator honors backpressure from the downstream and consumes
the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
reduce
does not operate by default on a particularScheduler
.
- Parameters:
reducer
- the function to reduce two values into one.- Returns:
- the new
Flowable
instance emitting the reduced value or empty if the currentParallelFlowable
is empty - Throws:
java.lang.NullPointerException
- ifreducer
isnull
-
reduce
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> reduce(@NonNull @NonNull Supplier<@NonNull R> initialSupplier, @NonNull @NonNull BiFunction<@NonNull R,? super @NonNull T,@NonNull R> reducer)
Reduces all values within a 'rail' to a single value (with a possibly different type) via a reducer function that is initialized on each rail from aninitialSupplier
value.Note that the same mapper function may be called from multiple threads concurrently.
- Backpressure:
- The operator honors backpressure from the downstream rails and consumes
the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
reduce
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the reduced output type- Parameters:
initialSupplier
- the supplier for the initial valuereducer
- the function to reduce a previous output of reduce (or the initial value supplied) with a current source value.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifinitialSupplier
orreducer
isnull
-
sequential
@BackpressureSupport(FULL) @SchedulerSupport("none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequential()
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a default prefetch value for the rails.This operator uses the default prefetch size returned by
Flowable.bufferSize()
.- Backpressure:
- The operator honors backpressure from the downstream and
requests
Flowable.bufferSize()
amount from each rail, then requests from each rail 75% of this amount after 75% received. - Scheduler:
sequential
does not operate by default on a particularScheduler
.
- Returns:
- the new
Flowable
instance - See Also:
sequential(int)
,sequentialDelayError()
-
sequential
@BackpressureSupport(FULL) @SchedulerSupport("none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequential(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a give prefetch value for the rails.- Backpressure:
- The operator honors backpressure from the downstream and
requests the
prefetch
amount from each rail, then requests from each rail 75% of this amount after 75% received. - Scheduler:
sequential
does not operate by default on a particularScheduler
.
- Parameters:
prefetch
- the prefetch amount to use for each rail- Returns:
- the new
Flowable
instance - Throws:
java.lang.IllegalArgumentException
- ifprefetch
is non-positive- See Also:
sequential()
,sequentialDelayError(int)
-
sequentialDelayError
@BackpressureSupport(FULL) @SchedulerSupport("none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequentialDelayError()
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a default prefetch value for the rails and delaying errors from all rails till all terminate.This operator uses the default prefetch size returned by
Flowable.bufferSize()
.- Backpressure:
- The operator honors backpressure from the downstream and
requests
Flowable.bufferSize()
amount from each rail, then requests from each rail 75% of this amount after 75% received. - Scheduler:
sequentialDelayError
does not operate by default on a particularScheduler
.
History: 2.0.7 - experimental
- Returns:
- the new
Flowable
instance - Since:
- 2.2
- See Also:
sequentialDelayError(int)
,sequential()
-
sequentialDelayError
@BackpressureSupport(FULL) @SchedulerSupport("none") @CheckReturnValue @NonNull public final @NonNull Flowable<T> sequentialDelayError(int prefetch)
Merges the values from each 'rail' in a round-robin or same-order fashion and exposes it as a regularFlowable
sequence, running with a give prefetch value for the rails and delaying errors from all rails till all terminate.- Backpressure:
- The operator honors backpressure from the downstream and
requests the
prefetch
amount from each rail, then requests from each rail 75% of this amount after 75% received. - Scheduler:
sequentialDelayError
does not operate by default on a particularScheduler
.
History: 2.0.7 - experimental
- Parameters:
prefetch
- the prefetch amount to use for each rail- Returns:
- the new
Flowable
instance - Throws:
java.lang.IllegalArgumentException
- ifprefetch
is non-positive- Since:
- 2.2
- See Also:
sequential()
,sequentialDelayError()
-
sorted
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final @NonNull Flowable<T> sorted(@NonNull @NonNull java.util.Comparator<? super @NonNull T> comparator)
Sorts the 'rails' of thisParallelFlowable
and returns aFlowable
that sequentially picks the smallest next value from the rails.This operator requires a finite source
ParallelFlowable
.- Backpressure:
- The operator honors backpressure from the downstream and
consumes the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
sorted
does not operate by default on a particularScheduler
.
- Parameters:
comparator
- the comparator to use- Returns:
- the new
Flowable
instance - Throws:
java.lang.NullPointerException
- ifcomparator
isnull
-
sorted
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final @NonNull Flowable<T> sorted(@NonNull @NonNull java.util.Comparator<? super @NonNull T> comparator, int capacityHint)
Sorts the 'rails' of thisParallelFlowable
and returns aFlowable
that sequentially picks the smallest next value from the rails.This operator requires a finite source
ParallelFlowable
.- Backpressure:
- The operator honors backpressure from the downstream and
consumes the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
sorted
does not operate by default on a particularScheduler
.
- Parameters:
comparator
- the comparator to usecapacityHint
- the expected number of total elements- Returns:
- the new
Flowable
instance - Throws:
java.lang.NullPointerException
- ifcomparator
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
toSortedList
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final @NonNull Flowable<java.util.List<T>> toSortedList(@NonNull @NonNull java.util.Comparator<? super @NonNull T> comparator)
Sorts the 'rails' according to the comparator and returns a full sortedList
as aFlowable
.This operator requires a finite source
ParallelFlowable
.- Backpressure:
- The operator honors backpressure from the downstream and
consumes the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
toSortedList
does not operate by default on a particularScheduler
.
- Parameters:
comparator
- the comparator to compare elements- Returns:
- the new
Flowable
instance - Throws:
java.lang.NullPointerException
- ifcomparator
isnull
-
toSortedList
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final @NonNull Flowable<@NonNull java.util.List<T>> toSortedList(@NonNull @NonNull java.util.Comparator<? super @NonNull T> comparator, int capacityHint)
Sorts the 'rails' according to the comparator and returns a full sortedList
as aFlowable
.This operator requires a finite source
ParallelFlowable
.- Backpressure:
- The operator honors backpressure from the downstream and
consumes the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
toSortedList
does not operate by default on a particularScheduler
.
- Parameters:
comparator
- the comparator to compare elementscapacityHint
- the expected number of total elements- Returns:
- the new
Flowable
instance - Throws:
java.lang.NullPointerException
- ifcomparator
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
doOnNext
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull @NonNull Consumer<? super @NonNull T> onNext)
Call the specified consumer with the current element passing through any 'rail'.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onNext
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonNext
isnull
-
doOnNext
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull @NonNull Consumer<? super @NonNull T> onNext, @NonNull @NonNull ParallelFailureHandling errorHandler)
Call the specified consumer with the current element passing through any 'rail' and handles errors based on the givenParallelFailureHandling
enumeration value.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Parameters:
onNext
- the callbackerrorHandler
- the enumeration that defines how to handle errors thrown from theonNext
consumer- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonNext
orerrorHandler
isnull
- Since:
- 2.2
-
doOnNext
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnNext(@NonNull @NonNull Consumer<? super @NonNull T> onNext, @NonNull @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Call the specified consumer with the current element passing through any 'rail' and handles errors based on the returned value by the handler function.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Parameters:
onNext
- the callbackerrorHandler
- the function called with the current repeat count and failureThrowable
and should return one of theParallelFailureHandling
enumeration values to indicate how to proceed.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonNext
orerrorHandler
isnull
- Since:
- 2.2
-
doAfterNext
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doAfterNext(@NonNull @NonNull Consumer<? super @NonNull T> onAfterNext)
Call the specified consumer with the current element passing through any 'rail' after it has been delivered to downstream within the rail.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onAfterNext
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonAfterNext
isnull
-
doOnError
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnError(@NonNull @NonNull Consumer<? super java.lang.Throwable> onError)
Call the specified consumer with the exception passing through any 'rail'.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onError
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonError
isnull
-
doOnComplete
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnComplete(@NonNull @NonNull Action onComplete)
Run the specifiedAction
when a 'rail' completes.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onComplete
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonComplete
isnull
-
doAfterTerminated
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doAfterTerminated(@NonNull @NonNull Action onAfterTerminate)
Run the specifiedAction
when a 'rail' completes or signals an error.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onAfterTerminate
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonAfterTerminate
isnull
-
doOnSubscribe
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnSubscribe(@NonNull @NonNull Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Call the specified callback when a 'rail' receives aSubscription
from its upstream.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onSubscribe
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonSubscribe
isnull
-
doOnRequest
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final @NonNull ParallelFlowable<T> doOnRequest(@NonNull @NonNull LongConsumer onRequest)
Call the specified consumer with the request amount if any rail receives a request.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onRequest
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonRequest
isnull
-
doOnCancel
@BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") @CheckReturnValue @NonNull public final @NonNull ParallelFlowable<T> doOnCancel(@NonNull @NonNull Action onCancel)
Run the specifiedAction
when a 'rail' receives a cancellation.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Parameters:
onCancel
- the callback- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifonCancel
isnull
-
collect
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final <@NonNull C> @NonNull ParallelFlowable<C> collect(@NonNull @NonNull Supplier<? extends @NonNull C> collectionSupplier, @NonNull @NonNull BiConsumer<? super @NonNull C,? super @NonNull T> collector)
Collect the elements in each rail into a collection supplied via acollectionSupplier
and collected into with a collector action, emitting the collection at the end.- Backpressure:
- The operator honors backpressure from the downstream rails and
consumes the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
map
does not operate by default on a particularScheduler
.
- Type Parameters:
C
- the collection type- Parameters:
collectionSupplier
- the supplier of the collection in each railcollector
- the collector, taking the per-rail collection and the current item- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifcollectionSupplier
orcollector
isnull
-
fromArray
@CheckReturnValue @NonNull @SafeVarargs @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public static <@NonNull T> @NonNull ParallelFlowable<T> fromArray(@NonNull @NonNull org.reactivestreams.Publisher<@NonNull T>... publishers)
Wraps multiplePublisher
s into aParallelFlowable
which runs them in parallel and unordered.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Type Parameters:
T
- the value type- Parameters:
publishers
- the array of publishers- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifpublishers
isnull
java.lang.IllegalArgumentException
- ifpublishers
is an empty array
-
to
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final <@NonNull R> R to(@NonNull @NonNull ParallelFlowableConverter<@NonNull T,@NonNull R> converter)
Calls the specified converter function during assembly time and returns its resulting value.This allows fluent conversion to any other type.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by how the converter function composes over the upstream source.
- Scheduler:
to
does not operate by default on a particularScheduler
.
History: 2.1.7 - experimental
- Type Parameters:
R
- the resulting object type- Parameters:
converter
- the function that receives the currentParallelFlowable
instance and returns a value- Returns:
- the converted value
- Throws:
java.lang.NullPointerException
- ifconverter
isnull
- Since:
- 2.2
-
compose
@CheckReturnValue @NonNull @BackpressureSupport(PASS_THROUGH) @SchedulerSupport("none") public final <@NonNull U> @NonNull ParallelFlowable<U> compose(@NonNull @NonNull ParallelTransformer<@NonNull T,@NonNull U> composer)
Allows composing operators, in assembly time, on top of thisParallelFlowable
and returns anotherParallelFlowable
with composed features.- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by how the converter function composes over the upstream source.
- Scheduler:
compose
does not operate by default on a particularScheduler
.
- Type Parameters:
U
- the output value type- Parameters:
composer
- the composer function fromParallelFlowable
(this) to anotherParallelFlowable
- Returns:
- the
ParallelFlowable
returned by the function - Throws:
java.lang.NullPointerException
- ifcomposer
isnull
-
flatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> flatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper)
Generates and flattensPublisher
s on each 'rail'.The errors are not delayed and uses unbounded concurrency along with default inner prefetch.
- Backpressure:
- The operator honors backpressure from the downstream rails and
requests
Flowable.bufferSize()
amount from each rail upfront and keeps requesting as many items per rail as many inner sources on that rail completed. The inner sources are requestedFlowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received. - Scheduler:
flatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
-
flatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> flatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors.It uses unbounded concurrency along with default inner prefetch.
- Backpressure:
- The operator honors backpressure from the downstream rails and
requests
Flowable.bufferSize()
amount from each rail upfront and keeps requesting as many items per rail as many inner sources on that rail completed. The inner sources are requestedFlowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received. - Scheduler:
flatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
-
flatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> flatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError, int maxConcurrency)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors and having a total number of simultaneous subscriptions to the innerPublisher
s.It uses a default inner prefetch.
- Backpressure:
- The operator honors backpressure from the downstream rails and
requests
maxConcurrency
amount from each rail upfront and keeps requesting as many items per rail as many inner sources on that rail completed. The inner sources are requestedFlowable.bufferSize()
amount upfront, then 75% of this amount requested after 75% received. - Scheduler:
flatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated innerPublisher
s- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifmaxConcurrency
is non-positive
-
flatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> flatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean delayError, int maxConcurrency, int prefetch)
Generates and flattensPublisher
s on each 'rail', optionally delaying errors, having a total number of simultaneous subscriptions to the innerPublisher
s and using the given prefetch amount for the innerPublisher
s.- Backpressure:
- The operator honors backpressure from the downstream rails and
requests
maxConcurrency
amount from each rail upfront and keeps requesting as many items per rail as many inner sources on that rail completed. The inner sources are requested theprefetch
amount upfront, then 75% of this amount requested after 75% received. - Scheduler:
flatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
delayError
- should the errors from the main and the inner sources delayed till everybody terminates?maxConcurrency
- the maximum number of simultaneous subscriptions to the generated innerPublisher
sprefetch
- the number of items to prefetch from each innerPublisher
- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifmaxConcurrency
orprefetch
is non-positive
-
concatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> concatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper)
Generates and concatenatesPublisher
s on each 'rail', signalling errors immediately and generating 2 publishers upfront.- Backpressure:
- The operator honors backpressure from the downstream rails and requests 2 from each rail upfront and keeps requesting 1 when the inner source complete. Requests for the inner sources are determined by the downstream rails' backpressure behavior.
- Scheduler:
concatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
source and the innerPublisher
s (immediate, boundary, end)- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
-
concatMap
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> concatMap(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, int prefetch)
Generates and concatenatesPublisher
s on each 'rail', signalling errors immediately and using the given prefetch amount for generatingPublisher
s upfront.- Backpressure:
- The operator honors backpressure from the downstream rails and
requests the
prefetch
amount from each rail upfront and keeps requesting 75% of this amount after 75% received and the inner sources completed. Requests for the inner sources are determined by the downstream rails' backpressure behavior. - Scheduler:
concatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
prefetch
- the number of items to prefetch from each innerPublisher
source and the innerPublisher
s (immediate, boundary, end)- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifprefetch
is non-positive
-
concatMapDelayError
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> concatMapDelayError(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, boolean tillTheEnd)
Generates and concatenatesPublisher
s on each 'rail', optionally delaying errors and generating 2 publishers upfront.- Backpressure:
- The operator honors backpressure from the downstream rails and requests 2 from each rail upfront and keeps requesting 1 when the inner source complete. Requests for the inner sources are determined by the downstream rails' backpressure behavior.
- Scheduler:
concatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
tillTheEnd
- iftrue
, all errors from the upstream and innerPublisher
s are delayed till all of them terminate, iffalse
, the error is emitted when an innerPublisher
terminates. source and the innerPublisher
s (immediate, boundary, end)- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
-
concatMapDelayError
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull R> @NonNull ParallelFlowable<R> concatMapDelayError(@NonNull @NonNull Function<? super @NonNull T,? extends org.reactivestreams.Publisher<? extends @NonNull R>> mapper, int prefetch, boolean tillTheEnd)
Generates and concatenatesPublisher
s on each 'rail', optionally delaying errors and using the given prefetch amount for generatingPublisher
s upfront.- Backpressure:
- The operator honors backpressure from the downstream rails and
requests the
prefetch
amount from each rail upfront and keeps requesting 75% of this amount after 75% received and the inner sources completed. Requests for the inner sources are determined by the downstream rails' backpressure behavior. - Scheduler:
concatMap
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the result type- Parameters:
mapper
- the function to map each rail's value into aPublisher
prefetch
- the number of items to prefetch from each innerPublisher
tillTheEnd
- iftrue
, all errors from the upstream and innerPublisher
s are delayed till all of them terminate, iffalse
, the error is emitted when an innerPublisher
terminates.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifprefetch
is non-positive
-
flatMapIterable
@CheckReturnValue @BackpressureSupport(FULL) @SchedulerSupport("none") @NonNull public final <@NonNull U> @NonNull ParallelFlowable<U> flatMapIterable(@NonNull @NonNull Function<? super @NonNull T,? extends java.lang.Iterable<? extends @NonNull U>> mapper)
Returns aParallelFlowable
that merges each item emitted by the source on each rail with the values in anIterable
corresponding to that item that is generated by a selector.- Backpressure:
- The operator honors backpressure from each downstream rail. The source
ParallelFlowable
s is expected to honor backpressure as well. If the sourceParallelFlowable
violates the rule, the operator will signal aMissingBackpressureException
. - Scheduler:
flatMapIterable
does not operate by default on a particularScheduler
.
- Type Parameters:
U
- the type of item emitted by the resultingIterable
- Parameters:
mapper
- a function that returns anIterable
sequence of values for when given an item emitted by the sourceParallelFlowable
- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
- Since:
- 3.0.0
- See Also:
- ReactiveX operators documentation: FlatMap,
flatMapStream(Function)
-
flatMapIterable
@CheckReturnValue @NonNull @BackpressureSupport(FULL) @SchedulerSupport("none") public final <@NonNull U> @NonNull ParallelFlowable<U> flatMapIterable(@NonNull @NonNull Function<? super @NonNull T,? extends java.lang.Iterable<? extends @NonNull U>> mapper, int bufferSize)
Returns aParallelFlowable
that merges each item emitted by the sourceParallelFlowable
with the values in anIterable
corresponding to that item that is generated by a selector.- Backpressure:
- The operator honors backpressure from each downstream rail. The source
ParallelFlowable
s is expected to honor backpressure as well. If the sourceParallelFlowable
violates the rule, the operator will signal aMissingBackpressureException
. - Scheduler:
flatMapIterable
does not operate by default on a particularScheduler
.
- Type Parameters:
U
- the type of item emitted by the resultingIterable
- Parameters:
mapper
- a function that returns anIterable
sequence of values for when given an item emitted by the sourceParallelFlowable
bufferSize
- the number of elements to prefetch from each upstream rail- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifbufferSize
is non-positive- Since:
- 3.0.0
- See Also:
- ReactiveX operators documentation: FlatMap,
flatMapStream(Function, int)
-
mapOptional
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> mapOptional(@NonNull @NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper)
Maps the source values on each 'rail' to an optional and emits its value if any.Note that the same mapper function may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into optional of Rs.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
- Since:
- 3.0.0
-
mapOptional
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> mapOptional(@NonNull @NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper, @NonNull @NonNull ParallelFailureHandling errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and handles errors based on the givenParallelFailureHandling
enumeration value.Note that the same mapper function may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into optional of Rs.errorHandler
- the enumeration that defines how to handle errors thrown from the mapper function- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
orerrorHandler
isnull
- Since:
- 3.0.0
-
mapOptional
@CheckReturnValue @NonNull @SchedulerSupport("none") @BackpressureSupport(PASS_THROUGH) public final <@NonNull R> @NonNull ParallelFlowable<R> mapOptional(@NonNull @NonNull Function<? super @NonNull T,@NonNull java.util.Optional<? extends @NonNull R>> mapper, @NonNull @NonNull BiFunction<? super java.lang.Long,? super java.lang.Throwable,ParallelFailureHandling> errorHandler)
Maps the source values on each 'rail' to an optional and emits its value if any and handles errors based on the returned value by the handler function.Note that the same mapper function may be called from multiple threads concurrently.
- Backpressure:
- The operator is a pass-through for backpressure and the behavior is determined by the upstream and downstream rail behaviors.
- Scheduler:
map
does not operate by default on a particularScheduler
.
History: 2.0.8 - experimental
- Type Parameters:
R
- the output value type- Parameters:
mapper
- the mapper function turning Ts into optional of Rs.errorHandler
- the function called with the current repeat count and failureThrowable
and should return one of theParallelFailureHandling
enumeration values to indicate how to proceed.- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
orerrorHandler
isnull
- Since:
- 3.0.0
-
flatMapStream
@CheckReturnValue @BackpressureSupport(FULL) @SchedulerSupport("none") @NonNull public final <@NonNull R> @NonNull ParallelFlowable<R> flatMapStream(@NonNull @NonNull Function<? super @NonNull T,? extends java.util.stream.Stream<? extends @NonNull R>> mapper)
Maps each upstream item on each rail into aStream
and emits theStream
's items to the downstream in a sequential fashion.Due to the blocking and sequential nature of Java
Stream
s, the streams are mapped and consumed in a sequential fashion without interleaving (unlike a more generalflatMap(Function)
). Therefore,flatMapStream
andconcatMapStream
are identical operators and are provided as aliases.The operator closes the
Stream
upon cancellation and when it terminates. The exceptions raised when closing aStream
are routed to the global error handler (RxJavaPlugins.onError(Throwable)
. If aStream
should not be closed, turn it into anIterable
and useflatMapIterable(Function)
:source.flatMapIterable(v -> createStream(v)::iterator);
Note that
Stream
s can be consumed only once; any subsequent attempt to consume aStream
will result in anIllegalStateException
.Primitive streams are not supported and items have to be boxed manually (e.g., via
IntStream.boxed()
):source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed());
Stream
does not support concurrent usage so creating and/or consuming the same instance multiple times from multiple threads can lead to undefined behavior.- Backpressure:
- The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator
prefetches
Flowable.bufferSize()
items of the upstream (then 75% of it after the 75% received) and caches them until they are ready to be mapped intoStream
s after the currentStream
has been consumed. - Scheduler:
flatMapStream
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the element type of theStream
s and the result- Parameters:
mapper
- the function that receives an upstream item and should return aStream
whose elements will be emitted to the downstream- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
- Since:
- 3.0.0
- See Also:
flatMap(Function)
,flatMapIterable(Function)
,flatMapStream(Function, int)
-
flatMapStream
@CheckReturnValue @BackpressureSupport(FULL) @SchedulerSupport("none") @NonNull public final <@NonNull R> @NonNull ParallelFlowable<R> flatMapStream(@NonNull @NonNull Function<? super @NonNull T,? extends java.util.stream.Stream<? extends @NonNull R>> mapper, int prefetch)
Maps each upstream item of each rail into aStream
and emits theStream
's items to the downstream in a sequential fashion.Due to the blocking and sequential nature of Java
Stream
s, the streams are mapped and consumed in a sequential fashion without interleaving (unlike a more generalflatMap(Function)
). Therefore,flatMapStream
andconcatMapStream
are identical operators and are provided as aliases.The operator closes the
Stream
upon cancellation and when it terminates. The exceptions raised when closing aStream
are routed to the global error handler (RxJavaPlugins.onError(Throwable)
. If aStream
should not be closed, turn it into anIterable
and useflatMapIterable(Function, int)
:source.flatMapIterable(v -> createStream(v)::iterator, 32);
Note that
Stream
s can be consumed only once; any subsequent attempt to consume aStream
will result in anIllegalStateException
.Primitive streams are not supported and items have to be boxed manually (e.g., via
IntStream.boxed()
):source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed(), 32);
Stream
does not support concurrent usage so creating and/or consuming the same instance multiple times from multiple threads can lead to undefined behavior.- Backpressure:
- The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator
prefetches the given amount of upstream items and caches them until they are ready to be mapped into
Stream
s after the currentStream
has been consumed. - Scheduler:
flatMapStream
does not operate by default on a particularScheduler
.
- Type Parameters:
R
- the element type of theStream
s and the result- Parameters:
mapper
- the function that receives an upstream item and should return aStream
whose elements will be emitted to the downstreamprefetch
- the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received- Returns:
- the new
ParallelFlowable
instance - Throws:
java.lang.NullPointerException
- ifmapper
isnull
java.lang.IllegalArgumentException
- ifprefetch
is non-positive- Since:
- 3.0.0
- See Also:
flatMap(Function, boolean, int)
,flatMapIterable(Function, int)
-
collect
@CheckReturnValue @NonNull @BackpressureSupport(UNBOUNDED_IN) @SchedulerSupport("none") public final <@NonNull A,@NonNull R> @NonNull Flowable<R> collect(@NonNull @NonNull java.util.stream.Collector<@NonNull T,@NonNull A,@NonNull R> collector)
Reduces all values within a 'rail' and across 'rails' with a callbacks of the givenCollector
into oneFlowable
containing a single value.Each parallel rail receives its own
Collector.accumulator()
andCollector.combiner()
.- Backpressure:
- The operator honors backpressure from the downstream and consumes
the upstream rails in an unbounded manner (requesting
Long.MAX_VALUE
). - Scheduler:
collect
does not operate by default on a particularScheduler
.
- Type Parameters:
A
- the accumulator typeR
- the output value type- Parameters:
collector
- theCollector
instance- Returns:
- the new
Flowable
instance emitting the collected value. - Throws:
java.lang.NullPointerException
- ifcollector
isnull
- Since:
- 3.0.0
-
-