Reactor study notes
  xHZVWyBuuoyF 2023年11月02日 36 0

Key features of Reactor:

·       Avoiding callback hell.  Multiple tasks can be orchestrated.

·       Analogous to assembly line.

·       Publisher:  publishes data, each operator wraps the previous step publisher into a new instance.

·       Subscriber: consumes data, publisher will not publish any data util a subscriber subscribes to it. Internally, a single request signal propagates upstream from subscriber back to publisher.

·       Supports backpressure: When downstream tasks process data slower than upstream task, downstream task can explicitly request n elements from source, otherwise, the source just push whatever data available. (Hybrid push/pull mode).

·       Cold sequence VS Hot sequence:  A cold sequence starts anew for each subscriber, including at the source of data. For example, if the source wraps an HTTP call, a new HTTP request is made for each subscription.  A hot sequence receives new data after they subscribed.  (some hot sequence may cache and replay history for new subscriber, or emit data if no subscriber is there).

Flux:

A sequence of 0 .. N items.

Terminated by a complete signal or an error.

Emits data:  handled by subscribers onNext method.

Emits complete signal:  handled by subscribers onComplete method.

Emits error: handled by subscribers onError method.

Mono:

A sequence of 0 or 1 item.

onNext + onComplete,  onNext + onError is illegal.

Can concat with other publisher to become flux.

Mono<Void> like Runnable.

Ways to create subscriber:

Flux.subscribe method, which can take up a lambda expression that accepts data, a lambda expression accepts error from upstream, a runnable executes on error or all source data has been emitted.

Create instance of BaseSubscriber   inherit hookOnSubscribe method when subscription is created and request the first element. Inherit hookOnNext method to request next element.

Cancel subscription:

Flux.subscribe method returns a Disposable.  Disposable.dispose() method sends cancel signal to source to stop producing data.

Backpressure:

Each time data is requested by subscriber, source pushes data until request amount is reached.              

Prefetching is supported, can specify a threshold of the request buffer, if data amount is less than threshold, source pushes data and fills the buffer even if no request is made.

Disable backpressure:  use BaseSubscriber.requestUnbounded  to tell producer to produce as fast as it can  (Default behavior of hookOnSubscribe).

Flux.create can specify an overflow strategy:

      IGNORE: completely ignore downstream backpressure request. This may yield IllegalStateException when queue gets full downstream.

      ERROR: signal an IllegalStateException when the downstream can’t keep up.

      DROP: drop the incoming signal if the downstream is not ready to receive it.

      LATEST: let downstream only get the latest signals from upstream.

      BUFFER (default): buffer all signals with an unbounded buffer.


Ways to create flux:

Factory method:  Flux.just,  Flux.range.

Flux.generate:

Generates synchronous sequence.

First parameter: A lambda expression that generates initial value.

Internal mutable state to augment data generation, Second parameter: A lambda expression that generates next state.

Sink.next emit next value.

Sink.complete emits complete signal.

An optional consumer receives the final state, can be used for data cleanup.  

Flux.create:

Generates asynchronous sequence from multiple thread.

Can generate chunks of data per round.

Can bridge existing async listener.

No internal state exposed.

Sink.next emit next value.

Sink.complete emits complete signal.

Flux.push:

Generate asynchronous sequence from one thread.

Can bridge existing async listener.

Only one thread can invoke next, complete, error.

Both flux.create & flux.push can register a few callbacks with sink in the creation block.

Sink.onRequest({ numberOfItems -> }:  defines the behavior to pull data

Sink.onCancel:  used to perform any action specific to cancellation prior to cleanup with onDispose

Sink.onDispose: used to perform cleanups when the flux completes, errors out or  is cancelled.

Flux.handle

Applied on existing flux.

Uses only synchronousSink and only allows one-by-one emission.

Can skip some values, like combination of filter + map.


Thread & scheduling:

Use Mono/Flux.publishOn(scheduler) or Mono/Flux.subscribeOn(scheduler) to specify scheduler to use. The scheduler can be created in the following ways: 

Schedulers.immediate: submitted runnable will be directly executed on current thread

Schedulers.single: reuses the same thread for all callers

Schedulers.newSingle: a dedicated thread per call

Schedulers.elastic or Schedulers.boundedElastic or Schedulers.parallel.  Used by async sequence to ensure that the number of threads created will not exceed certain upperbound.

Scheduler.publishOn  placed in the processing chain and changes the scheduler of the subsequent processors

Schedulers.subscribeOn  affects only the source of emission. The position placed in the processing chain does not matter.  With multiple subscribed on operator, only the first one matters.

Error Handling:

Flux.subscribe(value -> handlingCallback, error -> errorHandler):   equivalent to a try-catch block in java.

Flux.onErrorReturn(value)  or Flux.onErrorReturn(error -> errorPredicate, value):   returns value only if errorPredicate evaluates to true.

Flux.onErrorComplete:  completely swallow the error.

Flux.onErrorResume(error -> errorHandler):  alternative way of retrieving data. (Eg. From cache).  Use Flux.error() to rethrow the error.

Flux.onErrorMap(error -> exception): rethrow a wrapper exception built from the original error.


Logging:

Flux.doOnError(error -> errorHandler)   can be used to log error, and error will still be passed downstream.

Flux.doOnSubscribe + Flux.doFinally  can be used to calculate execution time.


Reference:

【响应式编程】 - 深度理解线程池新模型Schedulers包 - 掘金 (juejin.cn)



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  xHZVWyBuuoyF   2023年11月02日   37   0   0 reactor
xHZVWyBuuoyF