Go中响应式编程库RxGo详细介绍

Posted huageyiyangdewo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go中响应式编程库RxGo详细介绍相关的知识,希望对你有一定的参考价值。

最近的项目用到了 RxGo ,因为之前从没有接触过,特意去学了学,特此记录下。文章很多内容是复制了参考资料或者官方文档。如果涉及侵权,请联系删除,谢谢。

1、RxGo简介

1.1 基础介绍

RxGo是一个基于Go语言的响应式编程库,它提供了一种简单而强大的方式来处理异步事件流和数据流。RxGo的设计灵感来自于ReactiveX,它提供了类似于ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。

RxGo的目标是提供一种简单而强大的方式来处理异步事件流和数据流,使得开发人员可以更容易地编写高效、可维护和可扩展的代码。RxGo的特点包括:

  1. 响应式编程:RxGo提供了Observable和Observer两个核心概念,使得开发人员可以更容易地处理异步事件流和数据流。
  2. 操作符:RxGo提供了类似于ReactiveX的操作符,如map、filter、reduce等,使得开发人员可以更容易地对事件流进行转换、过滤和聚合等操作。
  3. 调度器:RxGo提供了调度器,使得开发人员可以更容易地控制事件流的执行线程和顺序。
  4. 可组合性:RxGo的操作符具有可组合性,使得开发人员可以更容易地组合多个操作符来实现复杂的操作。
  5. 高效性:RxGo的设计和实现都非常高效,可以处理大量的事件流和数据流。

总之,RxGo是一个非常强大和实用的响应式编程库,它可以帮助开发人员更容易地处理异步事件流和数据流,提高代码的可维护性和可扩展性。

1.2 RxGo 数据流程图

RxGo的实现基于管道的概念。管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutine。

  • 使用Just操作符创建一个基于固定列表的静态可观测数据。
  • 使用Map操作符定义了一个转换函数(把圆形变成方形)。
  • Filter操作符过滤掉黄色方形。

从上面的例子中可以看出来,最终生成的数据被发送到一个通道中,消费者读取数据进行消费。RxGo中有很多种消费和生成数据的方式,发布结果到通道中只是其中一种方式。

2、快速入门

2.1 安装 RxGo v2

go get -u github.com/reactivex/rxgo/v2

2.2 简单案例

我们先写一个简单的案例,来学习RxGo的简单使用。

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() 
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch 
    fmt.Println(item.V)
  

使用 RxGo 的一般流程如下:

  • 使用相关的 Operator 创建 ObservableOperator 就是用来创建 Observable 的。
  • 中间各个阶段可以使用过滤操作筛选出我们想要的数据,使用转换操作对数据进行转换;
  • 调用 ObservableObserve()方法,该方法返回一个<- chan rxgo.Item。然后for range遍历即可。

结合上面的这张图,我们就比较容易理解RxGo的数据处理流程。因为例子比较简单,没有用到Map、Filter操作。

执行结果:

$ go run main.go 
1
2
3
4
5

Just使用到柯里化的编程思想。
柯里化(Currying)是一种函数式编程的技术,它将一个接受多个参数的函数转换成一系列接受单个参数的函数。这些单参数函数可以被组合起来,以便在后续的计算中使用。

柯里化的主要优点是它可以使函数更加灵活和可复用。通过将函数分解为一系列单参数函数,我们可以更容易地组合和重用这些函数,从而减少代码的重复性和冗余性。

例如:

//柯里化的例子
func addCurried(x int) func(int) int 
	return func(y int) int 
		return x + y
	


func main()  
	add5 := addCurried(5)
	fmt.Println(add5(10))

由于 Go 不支持多个可变参数,Just通过柯里化迂回地实现了这个功能:

//Just creates an Observable with the provided items.
func Just(items ...interface) func(opts ...Option) Observable 
  return func(opts ...Option) Observable 
    return &ObservableImpl
      iterable: newJustIterable(items...)(opts...),
    
  

Observe()返回一个 Item 的chan ,Item的结构如下:

// Item is a wrapper having either a value or an error.
type	Item struct 
		V interface
		E error
	

所以通过Just生成observable对象时,传入的数据可以包含错误,在使用时通过 item.Error() 来区分。

func main() 
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch 
    if item.Error() 
      fmt.Println("error:", item.E)
     else 
      fmt.Println(item.V)
    
  

我们使用item.Error()检查是否出现错误。然后使用item.V访问数据,item.E访问错误。

除了使用for range之外,我们还可以调用 ObservableForEach()方法来实现遍历。ForEach()接受 3 个回调函数:

  • NextFunc:类型为func (v interface ),传入的数据不包含错误类型时走此函数处理。
  • ErrFunc:类型为func (err error),当传入的数据包含错误时走此函数;
  • CompletedFunc:类型为func ()Observable 完成时调用。

有点Promise那味了。使用ForEach(),可以将上面的示例改写为:

func main() 
  observable := rxgo.Just(1, 2, errors.New("这是一个测试错误!"), 4, 5)()
  <-observable.ForEach(func(v interface) 
    fmt.Println("received:", v)
  , func(err error) 
    fmt.Println("error:", err)
  , func() 
    fmt.Println("completed")
  )

$ go run main.go 
received: 1
received: 2
error: 这是一个测试错误!
received: 4
received: 5
completed

ForEach()返回的是一个 chan,用于当 observable 关闭时会向此chan发送数据。所以在 observable前面加了 <-来阻塞等待 ForEach()处理完数据。

3、RxGo 深入学习

上面的简单案例,我们是使用Just来创建observable。其实还有其他的方式创建observable。一起来看一看。

3.1 rxgo.Create

传入一个[]rxgo.Producer的切片,其中rxgo.Producer的类型为func(ctx context.Context, next chan<- Item)。我们可以在代码中调用rxgo.Of(value)生成数据,rxgo.Error(err)生成错误,然后发送到next通道中:

package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main()  
	observable := rxgo.Create([]rxgo.Producerfunc(ctx context.Context, next chan<- rxgo.Item) 
		next <- rxgo.Of(1)
		next <- rxgo.Of("aaa")
		next <- rxgo.Of(errors.New("test"))
	)

	ch := observable.Observe()
	for item := range ch 
		if item.Error() 
			fmt.Println("err:", item.E)
		else 
			fmt.Println(item.V)
		
	

因为rxgo.Create中的参数是[]rxgo.Producer,所以分成两个rxgo.Producer也是一样的效果:

observable := rxgo.Create([]rxgo.Producerfunc(ctx context.Context, next chan<- rxgo.Item) 
  next <- rxgo.Of(1)
  next <- rxgo.Of(2)
  next <- rxgo.Of(3)
  next <- rxgo.Error(errors.New("unknown"))
  , func(ctx context.Context, next chan<- rxgo.Item) 
  next <- rxgo.Of(4)
  next <- rxgo.Of(5)
)

3.2 rxgo.FromChannel

FromChannel可以直接从一个已存在的<-chan rxgo.Item对象中创建 Observable

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)



func main()  

	ch := make(chan rxgo.Item)
	go func() 
		for i := 0; i < 5; i++ 
			ch <- rxgo.Of(i)
		

		//需要手动关闭 ch 通道
		close(ch)
	()

	observable := rxgo.FromChannel(ch)
	for item := range observable.Observe() 
		if item.Error() 
			fmt.Println("err:", item.E)
		else 
			fmt.Println(item.V)
		
	

注意:

通道需要手动调用close()关闭,上面Create()方法内部rxgo自动帮我们执行了这个步骤。

func newCreateIterable(fs []Producer, opts ...Option) Iterable 
	...

	go func() 
		// Create方法内部自动关闭了 next 通道
		defer close(next)
		for _, f := range fs 
			f(ctx, next)
		
	()

	...

3.3 rxgo.Interval

Interval以传入的时间间隔生成一个无穷的数字序列,从 0 开始:

func main()  
	
	observable := rxgo.Interval(rxgo.WithDuration(time.Second))
	for item := range observable.Observe() 
		if item.Error() 
			fmt.Println("err:", item.E)
		else 
			fmt.Println(item.V)
		
	

运行后,第一秒输出 0,第二秒输出 1,以此类推。

3.4 rxgo.Range

func main() 
  observable := rxgo.Range(0, 3)
  for item := range observable.Observe() 
    fmt.Println(item.V)
  

Range可以生成一个范围内的数字:

上面代码依次输出 0,1,2,3。

3.5 Repeat

这个和之前的不太一样,这个是对已经存在的 observable对象调用 Repeat方法,从而实现重复生成数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main()  

	observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))
	for item := range observable.Observe() 
		if item.Error() 
			fmt.Println("err:", item.E)
		else 
			fmt.Println(item.V)
		
	

输出:

0
1
2
0
1
2
0
1
2

注意:这里执行的次数一共是3次,Repeat中的参数是2,重复2次,一共3次。

3.6 rxgo.Start

可以给Start方法传入[]rxgo.Supplier作为参数,它可以包含任意数量的rxgo.Supplier类型。rxgo.Supplier的底层类型为:

var Supplier func(ctx context.Context) rxgo.Item

Observable 内部会依次调用这些rxgo.Supplier生成rxgo.Item

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)



func Supplier1(ctx context.Context) rxgo.Item 
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier1", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(1)


func Supplier2(ctx context.Context) rxgo.Item 
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier2", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(2)


func Supplier3(ctx context.Context) rxgo.Item 
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier3", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(3)


func main() 
	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
	observable := rxgo.Start([]rxgo.SupplierSupplier1, Supplier2, Supplier3, rxgo.WithContext(ctx))
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

4、Observable 分类

根据数据在何处生成,Observable 被分为 HotCold 两种类型。

  • Hot Observable:热可观测量,数据由可观测量外部产生。
  • Cold Observable:冷可观测量,数据由可观测量内部产生。

通常不想一次性的创建所有的数据,使用 热可观测量。

4.1 热可观测量示例

func main() 
  ch := make(chan rxgo.Item)
  go func() 
    for i := 0; i < 3; i++ 
      ch <- rxgo.Of(i)
    
    close(ch)
  ()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() 
    fmt.Println(item.V)
  

  for item := range observable.Observe() 
    fmt.Println(item.V)
  

结果:

0
1
2

上面创建的是 Hot Observable。但是有个问题,第一次Observe()消耗了所有的数据,第二个就没有数据输出了。(可以用可连接的观测量来修改这一行为,后面再说)。

4.2 冷可观测量示例

Cold Observable 就不会有这个问题,因为它创建的流是独立于每个观察者的。即每次调用Observe()都创建一个新的 channel。我们使用Defer()方法创建 Cold Observable,它的参数与Create()方法一样。

func main() 
  observable := rxgo.Defer([]rxgo.Producerfunc(_ context.Context, ch chan<- rxgo.Item) 
    for i := 0; i < 3; i++ 
      ch <- rxgo.Of(i)
    
  )

  for item := range observable.Observe() 
    fmt.Println(item.V)
  

  for item := range observable.Observe() 
    fmt.Println(item.V)
  

Defer源码介绍:

// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable 
	return &ObservableImpl
		iterable: newDeferIterable(f, opts...),
	

执行结果:

$ go run main.go
0
1
2
0
1
2

4.3 可连接的 Observable

可连接的(Connectable)Observable 对普通的 Observable 进行了一层组装。调用它的Observe()方法时并不会立刻产生数据。使用它,我们可以等所有的观察者都准备就绪了(即调用了Observe()方法)之后,再调用其Connect()方法开始生成数据。我们通过两个示例比较使用普通的 Observable 和可连接的 Observable 有何不同。

4.3.1 普通的Observable,并不是可连接的Observable
func main() 
  ch := make(chan rxgo.Item)
  go func() 
    for i := 1; i <= 3; i++ 
      ch <- rxgo.Of(i)
    
    close(ch)
  ()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface) 
    fmt.Printf("First observer: %d\\n", i)
  )

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface) 
    fmt.Printf("Second observer: %d\\n", i)
  )

  time.Sleep(3 * time.Second)

上例中我们使用DoOnNext()方法来注册观察者。由于DoOnNext()方法是异步执行的,所以为了等待结果输出,在最后增加了一行time.Sleep。运行结果:

First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由输出可以看出,注册第一个观察者之后就开始产生数据了。第二个观察者并不会得到数据。

4.3.2 可连接的Observable

通过在创建 Observable 的方法中指定rxgo.WithPublishStrategy()选项就可以创建可连接的 Observable

  • 重点是传入rxgo.WithPublishStrategy()
func main() 
  ch := make(chan rxgo.Item)
  go func() 
    for i := 1; i <= 3; i++ 
      ch <- rxgo.Of(i)
    
    close(ch)
  ()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface) 
    fmt.Printf("First observer: %d\\n", i)
  )

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface) 
    fmt.Printf("Second observer: %d\\n", i)
  )
	
  //需要手动调用 observable.Connect 才会产生数据
  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)


运行输出:

$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

上面是等两个观察者都注册之后,并且手动调用了 Observable 的Connect()方法才产生数据。而且可连接的 Observable 有一个特性:它是冷启动的!!!,即每个观察者都会收到一份相同的拷贝。

5、转换 Observable

通过 RxGo 数据流程图我们知道,我们可以对rxgo.Item进行转换。rxgo 提供了很多转换函数,下面一起来学一学这些转换函数。

5.1 Map

Map()方法简单修改它收到的rxgo.Item然后发送到下一个阶段(转换或过滤)。Map()接受一个类型为func (context.Context, interface) (interface, error)的函数。第二个参数就是rxgo.Item中的数据,返回转换后的数据。如果出错,则返回错误。

func main() 
	observable := rxgo.Just(1, 2, 3)()

	observable = observable.Map(func(_ context.Context, i interface) (interface, error) 
		return i.(int), nil
	).Map(func(_ context.Context, i interface) (interface, error) 
		b := i.(int)
		if b % 2 == 0 
			return nil, errors.New("test")
		 else 
			return i, nil
		
	)

	for item := range observable.Observe() 
		fmt.Println(item.V)
	

上例中每个数字经过两个Map,第一个Map逻辑是原样输出,第二个Map逻辑是判断i是不是偶数,如果是偶数,就返回错误,否则原样输出。运行结果:

1
<nil>

我们将第一个Map中的语句改为下面的逻辑:

return i.(int) + 1, nil

运行结果:

<nil>

我们可以知道,数据的处理是串行的,第一个数据执行完所有的Map过后,第二个数据才会执行,当其中某一个执行返回的结果包含错误,就不会继续进行转换了,即不会数据不会进入到 Observe() 中的通道中去。

5.2 Marshal

Marshal对经过它的数据进行一次Marshal。这个Marshal可以是json.Marshal/proto.Marshal,甚至我们自己写的Marshal函数。它接受一个类型为func(interface) ([]byte, error)的函数用于对数据进行处理。

type User struct 
  Name string `json:"name"`
  Age  int    `json:"age"`


func main() 
  observable := rxgo.Just(
    User
      Name: "dj",
      Age:  18,
    ,
    User
      Name: "jw",
      Age:  20,
    ,
  )()

  observable = observable.Marshal(json.Marshal)

  for item := range observable.Observe() 
    fmt.Println(string(item.V.([]byte)))
  

执行结果:

"name":"dj","age":18
"name":"jw","age":20

由于Marshal操作返回的是[]byte类型,我们需要进行类型转换之后再输出。

5.3 Unmarshal

既然有Marshal,也就有它的相反操作UnmarshalUnmarshal用于将一个[]byte类型转换为相应的结构体或其他类型。与Marshal不同,Unmarshal需要知道转换的目标类型,所以需要提供一个函数用于生成该类型的对象。然后将[]byte数据Unmarshal到该对象中。Unmarshal接受两个参数,参数一是类型为func([]byte, interface) error的函数,参数二是func () interface用于生成实际类型的对象。我们拿上面的例子中生成的 JSON 字符串作为数据,将它们重新UnmarshalUser对象:

type User struct 
  Name string `json:"name"`
  Age  int    `json:"age"`


func main() 
  observable := rxgo.Just(
    `"name":"dj","age":18`,
    `"name":"jw","age":20`,
  )()

  observable = observable.Map(func(_ context.Context, i interface) (interface, error) 
    return []byte(i.(string)), nil
  ).Unmarshal(json.Unmarshal, func() interface 
    return &User
  )

  for item := range observable.Observe() 
    fmt.Println(item.V)
  

由于Unmarshaller接受[]byte类型的参数,我们在Unmarshal之前加了一个Map用于将string转为[]byte。运行结果:

&dj 18
&jw 20

5.4 Buffer

Buffer按照一定的规则收集接收到的数据,然后一次性发送出去(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer

  • BufferWithCount(n):每收到n个数据发送一次,最后一次可能少于n个;
  • BufferWithTime(n):发送在一个时间间隔n内收到的数据;
  • BufferWithTimeOrCount(d, n):收到n个数据,或经过d时间间隔,发送当前收到的数据。
5.4.1 BufferWithCount
func main() 
	observable := rxgo.Range(0, 5)

	observable = observable.BufferWithCount(2)

	for item := range observable.Observe() 
		fmt.Println(item.V)
	

执行结果:

[0 1]
[2 3]
[4]

最后一组只有一个。

5.4.2 BufferWithTime
unc main() 
	ch := make(chan rxgo.Item, 1)

	go func() 
		i := 0
		for range time.Tick(time.Second) 
			ch <- rxgo.Of(i)
			i++
		
	()

	observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() 
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))

	

执行结果是不确定的,这里需要注意:

startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() 
	ch := make(chan rxgo.Item, 1)

	go func() 
		i := 0
		for range time.Tick(time.Second) 
			ch <- rxgo.Of(i)
			i++
		
	()

	observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() 
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))
	

执行结果:

startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53

BufferWithTimeOrCount是以BufferWithCount、BufferWithTime谁先满足条件为准,谁先满足谁就先执行。

5.5 GroupBy

``GroupBy将一个Observable分成多个子Observable,每个子Observable`包含相同的索引值的元素。

GroupBy函数定义如下:

GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

即将一个Observable分成length个子Observable,根据distribution函数返回的int作为分组的依据。

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() 
	// 创建一个Observable,它发出一些整数值
	source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()

	// 使用GroupBy操作符将整数值按照奇偶性进行分组
	grouped := source.GroupBy(2, func(item rxgo.Item) int 
		return item.V.(int) % 2
	, rxgo.WithBufferedChannel(10))

	for subObservable := range grouped.Observe() 
		fmt.Println("new subObservable ------ ")
		for item := range subObservable.V.(rxgo.Observable).Observe() 
			fmt.Printf("%v\\n", item.V)
		
	


上面根据每个数模 3 的余数将整个流分为 3 组。运行:

new subObservable ------ 
2
4
6
8
10
new subObservable ------ 
1
3
5
7
9

注意rxgo.WithBufferedChannel(10)的使用,由于我们的数字是连续生成的,依次为 0->1->2->…->9->10。而 Observable 默认是惰性的,即由Observe()驱动。内层的Observe()在返回一个 0 之后就等待下一个数,但是下一个数 1 不在此 Observable 中。所以会陷入死锁。使用rxgo.WithBufferedChannel(10),设置它们之间的连接 channel 缓冲区大小为 10,这样即使我们未取出 channel 里面的数字,上游还是能发送数字进来。

6、并行操作

默认情况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。从上面的Map操作也可以得知默认是串行执行的。可以改变这一默认行为,使用rxgo.WithPool(n)选项设置运行n个 goroutine,或者rxgo.WitCPUPool()选项设置运行与逻辑 CPU 数量相等的 goroutine。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"math/rand"
	"time"
)

func main() 
	observable := rxgo.Range(1, 10)

	observable = observable.Map(func(_ context.Context, i interface) (interface, error) 
		time.Sleep(time.Duration(rand.Int31()))
		return i.(int) + 1, nil
	, rxgo.WithCPUPool())

	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

8
9
10
6
5
11
2
4
7
3

由于是并行运算,所以结果是不固定的。

我们可以直接看官网的介绍:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

7、过滤 Observable

我们可以对Observable 中发送过来的数据进行过滤,过滤掉不需要的数据,有以下方式:

  • Filter

  • ElementAt

  • Debounce

  • Distinct

  • Skip

  • Take

下面的内容大多来自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc

7.1 Filter

Filter()接受一个类型为func (i interface) bool的参数,通过的数据使用这个函数断言,返回true的将发送给下一个阶段。否则,丢弃。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() 
	observable := rxgo.Just(1, 2, 3)().
		Filter(func(i interface) bool 
			return i != 2
		)
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

1
3

7.2 ElementAt

ElementAt()只发送指定索引的数据,如ElementAt(2)只发送索引为 2 的数据,即第 3 个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() 
	observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

2

7.3 Debounce

只有当特定的时间跨度已经过去而没有发出另一个Item时,才从Observable发出一个Item

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main() 
	ch := make(chan rxgo.Item)

	go func() 
		ch <- rxgo.Of(1)
		time.Sleep(2 * time.Second)
		ch <- rxgo.Of(2)
		ch <- rxgo.Of(3)
		time.Sleep(2 * time.Second)
		close(ch)
	()

	observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

1
3

上面示例,先收到 1,然后 2s 内没收到数据,所以发送 1。接着收到了数据 2,由于马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最后输出为 1,3。

7.4 Distinct

Distinct()会记录它发送的所有数据,它不会发送重复的数据。由于数据格式多样,Distinct()要求我们提供一个函数,根据原数据返回一个唯一标识码(有点类似哈希值)。基于这个标识码去重。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() 
	observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
		Distinct(func(_ context.Context, i interface) (interface, error) 
			return i, nil
		)
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

1
2
3
4
5

7.5 Skip

Skip可以跳过前若干个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() 
	observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

3
4
5

7.6 Take

Take只取前若干个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() 
	observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
	for item := range observable.Observe() 
		fmt.Println(item.V)
	

结果:

1
2

8、选项

因为golang中不支持默认参数,所以我们经常会用到选项设计模式,rxgo中也大量使用到了此模式。

  • rxgo.WithBufferedChannel(10):设置 channel 的缓存大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多个 goroutine 执行转换操作;
  • rxgo.WithPublishStrategy():使用发布策略,即创建可连接的 Observable

rxgo还有很多其他选项,具体看官方文档,地址:

https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

参考链接

Go 每日一库之 rxgo

[官方例子](

响应式编程详解,带你熟悉Reactor响应式编程


文章目录

  • ​​一、什么是响应式编程​​
  • ​​1、Java的流和响应式流​​
  • ​​2、Java中响应式的使用​​
  • ​​3、Reactor中响应式流的基本接口​​
  • ​​4、Reactor中响应式接口的基本使用​​
  • ​​二、初始Reactor​​
  • ​​1、Flux和Mono的基本介绍​​
  • ​​2、引入Reactor依赖​​
  • ​​3、响应式类型的创建​​
  • ​​4、响应式类型的组合​​
  • ​​(1)使用mergeWith合并响应式流​​
  • ​​(2)使用zip压缩合并响应式流​​
  • ​​(3)使用zip压缩合并为自定义对象的响应式流​​
  • ​​(4)选择第⼀个反应式类型进⾏发布​​
  • ​​5、转换和过滤反应式流​​
  • ​​(1)skip操作跳过指定数⽬的消息​​
  • ​​(2)skip()操作的另⼀种形式​​
  • ​​(3)take操作只发布第⼀批指定数量的数据项​​
  • ​​(4)take操作的另一种形式​​
  • ​​(5)filter操作自定义过滤条件​​
  • ​​(6)distinct操作去重​​
  • ​​(7)map操作映射新元素​​
  • ​​(8)flatMap将流转成新的流​​
  • ​​(9)buffer操作现将数据流拆分为小块​​
  • ​​(10)collectList操作也可以将所有数据收集到一个List​​
  • ​​(11)collectMap 操作产生⼀个发布Map的Mono​​
  • ​​6、在反应式类型上执行逻辑操作​​
  • ​​(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件​​
  • ​​(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件​​
  • ​​7、在反应式类型上使用Subscriber订阅​​
  • ​​(1)使用Subscriber消费消息​​
  • ​​(2)使用Flux的doOnNext处理数据​​
  • ​​8、使用then来处理完成数据返回​​
  • ​​写在后面​​

一、什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。

命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。

响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。

Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。

1、Java的流和响应式流

Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。

响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。

2、Java中响应式的使用

JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:

ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer()
@Override
public void update(Observable o, Object arg)
System.out.println("发生了变化");

);
observer.addObserver(new Observer()
@Override
public void update(Observable o, Object arg)
System.out.println("收到了通知");

);
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知

JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class FlowDemo

public static void main(String[] args) throws Exception
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>()

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);


@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();


// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();


@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();


// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();


@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");


;

// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);

// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 1000; i++)
System.out.println("生成数据:" + i);
// submit是个block方法
publiser.submit(i);


// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();

// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();


import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
* 带 process 的 flow demo
*/

/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String>

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);


@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);

// 过滤掉小于0的, 然后发布出去
if (item > 0)
this.submit("转换后的数据:" + item);


// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();


@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();


// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();


@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();



public class FlowDemo2

public static void main(String[] args) throws Exception
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();

// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);

// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<String>()

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);


@Override
public void onNext(String item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();


@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();


// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();


@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");


;

// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);

// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);

// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();

// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);


3、Reactor中响应式流的基本接口

响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。

Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。

public interface Publisher<T> 
// Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。
public void subscribe(Subscriber<? super T> s);

⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。

public interface Subscriber<T> 
// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。
public void onSubscribe(Subscription s);
// 每个数据项都会通过该方法处理
public void onNext(T t);
// 异常处理
public void onError(Throwable t);
// 结束
public void onComplete();

Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。

通过Subscription,Subscriber可以管理其订阅情况:

public interface Subscription 
// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据
// 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量
public void request(long n);
// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅
public void cancel();

Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。

Processor接⼝,它是Subscriber和Publisher的组合:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> 

4、Reactor中响应式接口的基本使用

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class ReactorDemo

public static void main(String[] args)
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1个元素
// Flux 0-N个元素
String[] strs = "1", "2", "3" ;

// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>()

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;

// 请求一个数据
this.subscription.request(1);


@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);

try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();


// 处理完调用request再请求一个数据
this.subscription.request(1);

// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();


@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();


// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();


@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");


;

// 这里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是jdk9的reactive stream
.subscribe(subscriber);


二、初始Reactor

1、Flux和Mono的基本介绍

Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。

这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。

Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。

注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。

响应式编程详解,带你熟悉Reactor响应式编程_ide


响应式编程详解,带你熟悉Reactor响应式编程_数据_02

2、引入Reactor依赖

需要引入reactor-core核心包和测试包。

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.x.x</version>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.x.x</version>
<scope>test</scope>
</dependency>

3、响应式类型的创建

Reactor提供了多种创建Flux和Mono的操作。

// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(
f -> System.out.println("Heres some fruit: " + f)
);

// 根据集合创建
String[] fruits = new String[]
"Apple", "Orange", "Grape", "Banana", "Strawberry" ;
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合

Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流

// 根据区间创建1-5
Flux<Integer> intervalFlux =
Flux.range(1, 5);
intervalFlux.subscribe(
f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =
Flux.interval(Duration.ofSeconds(1))
.take(5);
intervalFlux2.subscribe(
f -> System.out.println("data2 is :" + f)
);

// 阻塞,等待结果
Thread.sleep(100000);

4、响应式类型的组合

(1)使用mergeWith合并响应式流

响应式编程详解,带你熟悉Reactor响应式编程_数据_03

Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据

Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据

// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

mergedFlux.subscribe(System.out::println);

// 阻塞,等待结果
Thread.sleep(100000);

我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。

(2)使用zip压缩合并响应式流

响应式编程详解,带你熟悉Reactor响应式编程_ide_04

Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);

zippedFlux.subscribe(t ->
System.out.println(t.getT1() + "|" + t.getT2());
);
/**
* 执行结果:
* Garfield|Lasagna
* Kojak|Lollipops
* Barbossa|Apples
*/

(3)使用zip压缩合并为自定义对象的响应式流

如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。

响应式编程详解,带你熟悉Reactor响应式编程_ide_05


zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)

Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");

// 压缩成自定义对象
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);

/**
* 执行结果:
* Garfield eats Lasagna
* Kojak eats Lollipops
* Barbossa eats Apples
*/

(4)选择第⼀个反应式类型进⾏发布

假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。

响应式编程详解,带你熟悉Reactor响应式编程_数据_06

Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* hare
* cheetah
* squirrel
*/

5、转换和过滤反应式流

在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。

(1)skip操作跳过指定数⽬的消息

skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递

响应式编程详解,带你熟悉Reactor响应式编程_响应式_07

// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
skipFlux.subscribe(System.out::println);
/**
* 执行结果
* ninety nine
* one hundred
*/

(2)skip()操作的另⼀种形式

在⼀段时间之内跳过所有的第⼀批数据。

响应式编程详解,带你熟悉Reactor响应式编程_响应式_08

// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1)) // 每1秒一个
.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);

// 阻塞,等待结果
Thread.sleep(100000);

/**
* 执行结果:
* ninety nine
* one hundred
*/

(3)take操作只发布第⼀批指定数量的数据项

根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。

响应式编程详解,带你熟悉Reactor响应式编程_数据_09

// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/

(4)take操作的另一种形式

take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。

响应式编程详解,带你熟悉Reactor响应式编程_java_10

// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/

(5)filter操作自定义过滤条件

filter操作允许我们根据任何条件进⾏选择性地发布。

响应式编程详解,带你熟悉Reactor响应式编程_响应式_11

Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果
* Yellowstone
* Yosemite
* Zion
*/

(6)distinct操作去重

响应式编程详解,带你熟悉Reactor响应式编程_响应式_12

Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
// 去重
animalFlux.subscribe(System.out::println);
/**
* 执行结果:
* dog
* cat
* bird
* anteater
*/

(7)map操作映射新元素

map将元素映射为新的元素,并创建一个新的Flux。

响应式编程详解,带你熟悉Reactor响应式编程_响应式_13

// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n ->
String[] split = n.split("\\\\s");
return split.length; // 将String转为Integer
);
integerFlux.subscribe(System.out::println);

/**
* 执行结果:
* 2
* 2
* 2
*/

其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。

(8)flatMap将流转成新的流

flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。

响应式编程详解,带你熟悉Reactor响应式编程_响应式_14

// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux
.just("Michael", "Scottie Pippen", "Steve Kerr Ob")
.flatMap(n -> Mono.just(n)
.map(p ->
String[] split = p.split("\\\\s");
return split.length; // 将String转为Integer
)
.subscribeOn(Schedulers.parallel()) // 定义异步
);
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);

响应式编程详解,带你熟悉Reactor响应式编程_java_15

(9)buffer操作现将数据流拆分为小块

buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)

响应式编程详解,带你熟悉Reactor响应式编程_java_16

// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/**
* 执行结果:
* [apple, orange, banana]
* [kiwi, strawberry]
*/
// 可以分片后并行执行
bufferedFlux.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
).subscribe(l ->
System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
);
/**
* 执行结果(因为并行执行,结果可能不一致):
* parallel-1线程执行:APPLE
* parallel-1线程执行:ORANGE
* parallel-1线程执行:BANANA
* parallel-2线程执行:KIWI
* parallel-2线程执行:STRAWBERRY
*/
// 阻塞,等待结果
Thread.sleep(100000);

使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:

响应式编程详解,带你熟悉Reactor响应式编程_java_17

Flux<List<String>> bufferedFlux = fruitFlux.buffer();

(10)collectList操作也可以将所有数据收集到一个List

collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。

响应式编程详解,带你熟悉Reactor响应式编程_响应式_18

Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();

(11)collectMap 操作产生⼀个发布Map的Mono

collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)

响应式编程详解,带你熟悉Reactor响应式编程_ide_19

Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/**
* 执行结果:
* a=aardvark, e=eagle, k=kangaroo
*/

// 阻塞,等待结果
Thread.sleep(100000);

key相同的,会被覆盖。

6、在反应式类型上执行逻辑操作

(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件

响应式编程详解,带你熟悉Reactor响应式编程_ide_20

Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));

都满足条件会返回true,否则返回false。

(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件

响应式编程详解,带你熟悉Reactor响应式编程_ide_21

Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));

至少有一个满足条件,就为true,都不满足就为false。

7、在反应式类型上使用Subscriber订阅

(1)使用Subscriber消费消息

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

stringFlux.subscribe(new Subscriber<String>()
// 保存订阅关系, 需要用它来给发布者响应
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription)
System.out.println("订阅者开始订阅");
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);


@Override
public void onNext(String item)
System.out.println("订阅者开始处理数据" + item);
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();

// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();


@Override
public void onError(Throwable t)
// 出现了异常(例如处理数据的时候产生了异常)
t.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();


@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("订阅者处理完了!");

);
/**
* 执行结果:
* 订阅者开始订阅
* 订阅者开始处理数据Apple
* 订阅者开始处理数据Orange
* 订阅者开始处理数据Grape
* 订阅者开始处理数据Banana
* 订阅者开始处理数据Strawberry
* 订阅者处理完了!
*/

// 阻塞
Thread.sleep(10000);

(2)使用Flux的doOnNext处理数据

Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 订阅者处理数据:Apple
* 发布者处理数据:Orange
* 订阅者处理数据:Orange
* 发布者处理数据:Grape
* 订阅者处理数据:Grape
* 发布者处理数据:Banana
* 订阅者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/

// 阻塞
Thread.sleep(10000);

但是!以下写法是不会触发发布者的doOnNext事件的:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));

只有链式调用,才会触发发布者的doOnNext事件。

doOnNext可以写多个,顺序执行:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t))
.doOnNext(t -> System.out.println("发布者2处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者1处理数据:Apple
* 发布者2处理数据:Apple
* 订阅者处理数据:Apple
* 发布者1处理数据:Orange
* 发布者2处理数据:Orange
* 订阅者处理数据:Orange
* 发布者1处理数据:Grape
* 发布者2处理数据:Grape
* 订阅者处理数据:Grape
* 发布者1处理数据:Banana
* 发布者2处理数据:Banana
* 订阅者处理数据:Banana
* 发布者1处理数据:Strawberry
* 发布者2处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/

8、使用then来处理完成数据返回

Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.then(Mono.defer(() ->
return Mono.just("我完成了");
))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 发布者处理数据:Orange
* 发布者处理数据:Grape
* 发布者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:我完成了
*/

通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。

写在后面

如果本文对你有帮助,请点赞收藏关注一下吧 ~

响应式编程详解,带你熟悉Reactor响应式编程_数据_22


以上是关于Go中响应式编程库RxGo详细介绍的主要内容,如果未能解决你的问题,请参考以下文章

响应式编程和Rxjs库介绍

Go Web 编程之 响应

iOS中的函数响应式编程思想

响应式编程库 RxJava 初探

Go 函数式编程篇:函数使用入门和常用内置函数介绍

『每周译Go』使用 Go 泛型的函数式编程