Key features of Reactor:
· Avoiding callback hell. Multiple tasks can be orchestrated.
· Analogous to assembly line.
· Publisher: publishes data, each operator wraps the previous step publisher into a new instance.
· Subscriber: consumes data, publisher will not publish any data util a subscriber subscribes to it. Internally, a single request signal propagates upstream from subscriber back to publisher.
· Supports backpressure: When downstream tasks process data slower than upstream task, downstream task can explicitly request n elements from source, otherwise, the source just push whatever data available. (Hybrid push/pull mode).
· Cold sequence VS Hot sequence: A cold sequence starts anew for each subscriber, including at the source of data. For example, if the source wraps an HTTP call, a new HTTP request is made for each subscription. A hot sequence receives new data after they subscribed. (some hot sequence may cache and replay history for new subscriber, or emit data if no subscriber is there).
Flux:
A sequence of 0 .. N items.
Terminated by a complete signal or an error.
Emits data: handled by subscribers onNext method.
Emits complete signal: handled by subscribers onComplete method.
Emits error: handled by subscribers onError method.
Mono:
A sequence of 0 or 1 item.
onNext + onComplete, onNext + onError is illegal.
Can concat with other publisher to become flux.
Mono<Void> like Runnable.
Ways to create subscriber:
Flux.subscribe method, which can take up a lambda expression that accepts data, a lambda expression accepts error from upstream, a runnable executes on error or all source data has been emitted.
Create instance of BaseSubscriber inherit hookOnSubscribe method when subscription is created and request the first element. Inherit hookOnNext method to request next element.
Cancel subscription:
Flux.subscribe method returns a Disposable. Disposable.dispose() method sends cancel signal to source to stop producing data.
Backpressure:
Each time data is requested by subscriber, source pushes data until request amount is reached.
Prefetching is supported, can specify a threshold of the request buffer, if data amount is less than threshold, source pushes data and fills the buffer even if no request is made.
Disable backpressure: use BaseSubscriber.requestUnbounded to tell producer to produce as fast as it can (Default behavior of hookOnSubscribe).
Flux.create can specify an overflow strategy:
IGNORE: completely ignore downstream backpressure request. This may yield IllegalStateException when queue gets full downstream.
ERROR: signal an IllegalStateException when the downstream can’t keep up.
DROP: drop the incoming signal if the downstream is not ready to receive it.
LATEST: let downstream only get the latest signals from upstream.
BUFFER (default): buffer all signals with an unbounded buffer.
Ways to create flux:
Factory method: Flux.just, Flux.range.
Flux.generate:
Generates synchronous sequence.
First parameter: A lambda expression that generates initial value.
Internal mutable state to augment data generation, Second parameter: A lambda expression that generates next state.
Sink.next emit next value.
Sink.complete emits complete signal.
An optional consumer receives the final state, can be used for data cleanup.
Flux.create:
Generates asynchronous sequence from multiple thread.
Can generate chunks of data per round.
Can bridge existing async listener.
No internal state exposed.
Sink.next emit next value.
Sink.complete emits complete signal.
Flux.push:
Generate asynchronous sequence from one thread.
Can bridge existing async listener.
Only one thread can invoke next, complete, error.
Both flux.create & flux.push can register a few callbacks with sink in the creation block.
Sink.onRequest({ numberOfItems -> }: defines the behavior to pull data
Sink.onCancel: used to perform any action specific to cancellation prior to cleanup with onDispose
Sink.onDispose: used to perform cleanups when the flux completes, errors out or is cancelled.
Flux.handle
Applied on existing flux.
Uses only synchronousSink and only allows one-by-one emission.
Can skip some values, like combination of filter + map.
Thread & scheduling:
Use Mono/Flux.publishOn(scheduler) or Mono/Flux.subscribeOn(scheduler) to specify scheduler to use. The scheduler can be created in the following ways:
Schedulers.immediate: submitted runnable will be directly executed on current thread
Schedulers.single: reuses the same thread for all callers
Schedulers.newSingle: a dedicated thread per call
Schedulers.elastic or Schedulers.boundedElastic or Schedulers.parallel. Used by async sequence to ensure that the number of threads created will not exceed certain upperbound.
Scheduler.publishOn placed in the processing chain and changes the scheduler of the subsequent processors
Schedulers.subscribeOn affects only the source of emission. The position placed in the processing chain does not matter. With multiple subscribed on operator, only the first one matters.
Error Handling:
Flux.subscribe(value -> handlingCallback, error -> errorHandler): equivalent to a try-catch block in java.
Flux.onErrorReturn(value) or Flux.onErrorReturn(error -> errorPredicate, value): returns value only if errorPredicate evaluates to true.
Flux.onErrorComplete: completely swallow the error.
Flux.onErrorResume(error -> errorHandler): alternative way of retrieving data. (Eg. From cache). Use Flux.error() to rethrow the error.
Flux.onErrorMap(error -> exception): rethrow a wrapper exception built from the original error.
Logging:
Flux.doOnError(error -> errorHandler) can be used to log error, and error will still be passed downstream.
Flux.doOnSubscribe + Flux.doFinally can be used to calculate execution time.
Reference:
【响应式编程】 - 深度理解线程池新模型Schedulers包 - 掘金 (juejin.cn)