reactor之数据源的产生
Posted aofengdaxia
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了reactor之数据源的产生相关的知识,希望对你有一定的参考价值。
Flux和Mono作为数据源,产生数据的方法有好多种,本文中对常用的进行罗列和简述。不足的地方可以去源文档中查看。
空数据源或者特殊数据源
empty
通过empty方法,可以创建一个空的数据流。
Flux.empty();
Mono.empty();
error
通过error方法,可以创建一个错误的数据流。
Flux.error(new RuntimeException("error"));
Mono.error(new RuntimeException("error"));
never
通过never方法,可以创建一个永远不会发出任何信号的数据流。
Flux.never();
Mono.never();
简单的数据源
just
通过just方法,可以创建一个包含固定元素的数据流。
Flux.just("foo", "bar", "foobar");
Mono.just("foo");
range
通过range方法,可以创建一个包含指定范围的整数的数据流。
FLux<Integer> flux = Flux.range(1, 10);
fromArray
通过fromArray方法,可以创建一个包含数组元素的数据流。
Flux.fromArray(new Integer[]1, 2, 3);
fromIterable
通过fromIterable方法,可以创建一个包含Iterable元素的数据流。
Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));
fromStream
通过fromStream方法,可以创建一个包含Stream元素的数据流。
Flux.fromStream(Stream.of("foo", "bar", "foobar"));
fromFuture
通过fromFuture方法,可以创建一个包含CompletableFuture元素的数据流。
Mono.fromFuture(CompletableFuture.completedFuture("foo"));
fromRunnable
通过fromRunnable方法,可以创建一个包含Runnable元素的数据流。
Mono.fromRunnable(() -> System.out.println("foo"));
Interval产生间隔时间发送数据
interval
通过interval方法,可以创建一个每隔固定时间就发送元素的数据流。
Flux.interval(Duration.ofSeconds(1));
intervalMillis
通过intervalMillis方法,可以创建一个每隔固定时间就发送元素的数据流。
Flux.intervalMillis(1000);
高级方法generate和create
generate
通过generate方法,可以自定义数据流的产生。 generate方法同步地,一次产生一个元素。意味着,当一个元素被消费后,才会产生下一个元素。sink是一个SynchronousSink,它提供了next()方法来产生元素。
generate有多个重载方法。
Flux.generate(generator);
Flux.generate(stateSupplier,generator);
/**
* @param stateSupplier 一个无参函数,返回一个初始状态
* @param generator 一个BiFunction,接受状态和SynchronousSink,返回新的状态
* @param stateConsumer 一个Consumer,接受状态,用于清理资源
*/
Flux.generate(stateSupplier,generator,stateConsumer)
具体的demo如下:
Flux.generate(sink->
sink.next("Hello");
sink.complete();
);
Flux.generate(()->0,(state,sink)->
sink.next("3 x "+state+" = "+3*state);
if(state==10)sink.complete();
return state+1;
);
create
通过create方法,可以自定义数据流的产生。create方法相对于generate方法更加高级,既可以同步地,一次产生一个元素,也可以异步地,一次产生多个元素。 该方法用到了 FluxSink,后者同样提供 next,error 和
complete 等方法。 与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。
//create方法的接口
Flux.create(Consumer<? super FluxSink<T>>emitter)
/**
*
*/
Flux.create(Consumer<? super FluxSink<T>>emitter,FluxSink.OverflowStrategy overflowStrategy)
假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:
interface MyEventListener<T>
void onDataChunk(List<T> chunk);
void processComplete();
你可以使用 create 方法来实现这个 API:
Flux.create(sink->
MyEventListener<T> listener=new MyEventListener<T>()
@Override
public void onDataChunk(List<T> chunk)
for(T value:chunk)
sink.next(value);
@Override
public void processComplete()
sink.complete();
;
source.register(listener);
);
此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为。
- IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。
- ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。
- DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
- LATEST:让下游只得到上游最新的元素。
- BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。
推送(push)模式
在推送模式中,数据源会主动推送数据给下游。这种模式下,数据源是生产者,下游是消费者。 create 的一个变体是 push,适合生成事件流。与 create类似,
push 也可以是异步地, 并且能够使用以上各种溢出策略(overflow
strategies)管理背压。每次只有一个生成线程可以调用 next,complete 或 error。
Flux<String> bridge=Flux.push(sink->
myEventProcessor.register(new SingleThreadEventListener<String>()
public void onDataChunk(List<String> chunk)
for(String s:chunk)
sink.next(s);
public void processComplete()
sink.complete();
public void processError(Throwable e)
sink.error(e);
);
);
推送/拉取(push/pull)混合模式
不像 push,create 可以用于 push 或 pull 模式,因此适合桥接监听器的 的 API,因为事件消息会随时异步地到来。回调方法 onRequest 可以被注册到 FluxSink
以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游。
Flux<String> bridge = Flux.create(sink ->
myMessageProcessor.register(
new MyMessageListener<String>()
public void onMessage(List<String> messages)
for(String s : messages)
sink.next(s);
);
sink.onRequest(n ->
List<String> messages = myMessageProcessor.request(n);
for(String s : message)
sink.next(s);
);
在上述的文章中,拆选和列举了常见的生成数据源的方式和方法。仅仅是为了方便更好的入门和理解,严谨性和完整性很难保证。如果要进一步学习,或者感觉有谬误,应该去官网查看更多的资料。
以上是关于reactor之数据源的产生的主要内容,如果未能解决你的问题,请参考以下文章
Flume系列之:记录一次上游数据库产生大量数据导致flume agent数据堆积和服务器IO打满,严重影响下游任务的快速应对处理方法