Flink 有状态的流的工作(Working with state)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 有状态的流的工作(Working with state)相关的知识,希望对你有一定的参考价值。

参考技术A

有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:
  当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;
  当每分钟聚合事件时,状态会保存挂起的聚合
  当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数

为了使state容错,Flink需要识别state并 checkpoint 它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。

这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。

在Flink中有两个基本的state:Keyed state和 Operator state

Keyed State 总是与key相关,并且只能应用于 KeyedStream 的函数和操作中。

你可以认为 Keyed State 是一个已经分区或者划分的,每个state分区对应一个key的 Operator State , 每个 keyed-state 逻辑上与一个<并行操作实例, 键>( <parallel-operator-instance, key> )绑定在一起,由于每个key属于唯一一个键控算子( keyed operator )的并行实例,我们可以简单地看作是 <operator, key> 。

Keyed State 可以进一步的组成 Key Group , Key Group 是Flink重新分配 Keyed State 的最小单元,这里有跟定义的最大并行数一样多的 Key Group ,在运行时 keyed operator 的并行实例与key一起为一个或者多个 Key Group 工作。

使用 Operator State (或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。 Kafka Connector 就是在Flink中使用 Operator State 的一个很好的例子,每个 Kafka consumer 的并行实例保存着一个 topic 分区和偏移量的map作为它的 Operator State 。
当并行数发生变化时, Operator State 接口支持在并行操作实例中进行重新分配,这里有多种方法来进行重分配。

Keyed State和 Operator State存在两种形式:托管的和原生的。

托管的State( Managed State )由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。

原生State( Raw State )是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。

所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管理内存。

托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。
现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:
ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用 update(T) 来更新,使用 T value() 来获取。

ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的 Iterable ,元素可以通过调用 add(T) 来添加,而 Iterable 可以调用 Iterable<T> get() 来获取。

ReducingState<T>:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReduceFunction聚合成一个聚合值。

FoldingState<T, ACC>:这将保存表示添加到状态的所有值的聚合的单个值,与ReducingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。

MapState<T>:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用 put(UK, UV) 或者 putAll(Map<UK, UV>) 来添加。与key相关的value,可以使用 get(UK) 来获取,映射的迭代、keys及values可以分别调用 entries() , keys() 和 values() 来获取。

所有类型的state都有一个 clear() 方法来清除当前活动的key(及输入元素的key)的State。

注意: FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。

值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。

为了获得一个State句柄,你需要创建一个 StateDescriptor ,这个 StateDescriptor 保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个 ReduceFunction 。根据你想要检索的state的类型,你可以创建一个 ValueStateDescriptor , 一个 ListStateDescriptor , 一个 ReducingStateDescriptor , 一个 FoldingStateDescriptor 或者一个 MapStateDescriptor

State可以通过 RuntimeContext 来访问,所以只能在富函数中使用。 RichFunction 中的 RuntimeContext 有以下这些方法来访问state:
   ValueState<T> getState(ValueStateDescriptor<T>)
   ReducingState<T> getReducingState(ReduceingStateDescriptor<T>)
   ListState<T> getListState(ListStateDescriptor<T>)
   FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
   MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这个 FlatMapFunction 例子展示了所有部件如何组合在一起:

这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。

除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的 map() 或 flatMap() 函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。

为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口

CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:

无论何时执行checkpoint, snapshotState() 都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的 initializeState() ,当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此, initializeState() 不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。

目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:
Even-split redistribution :每个算子返回一个State元素列表,
Union redistribution :每个算子返回一个State元素列表,

Apache Flink 数据流编程模型

抽象等级(Levels of Abstraction)

Flink提供不同级别的抽象来开发流/批处理应用程序。

Statefule Stream Processing: 是最低级别(底层)的抽象,只提供有状态的流。它通过ProcessFunction嵌入到DataStream API之中。它使得用户可以自由处理来源于一个或者多个流的事件

DataStream/DataSet API: 在我们的实际工作中,大多数的应用程序是不需要上文所描述的低级别(底层)抽象,而是相对于诸如DataStream API(有界/无界流)和DataSet API(有界数据集)的Core API进行编程。这些API提供了用于数据处理的通用模块,如各种指定的transformations, joins, aggregations, windows, state等。在API中,这些处理的数据类型都是一个具体的实体类(class)。

底层的Process Function与DataStream API集成在一起,可以仅对一些操作进行底层抽象。

Table API: 是围绕着table的申明性DSL,可以被动态的改变(当其表示流时)。Table API遵循(扩展)关系模型:表有一个模式链接(类似与在关系数据库中的表),API也提供了一些类似的操作:select, project, join, group-by, aggregate等。Table API程序申明定义了怎么做是规范的,而不是明确指定应该是什么样子的。虽然Table API可以通过各种类型的用户定义的函数进行扩展,但它比Core API表达的更少,但使用起来更简洁(少写代码)。另外,Table API程序也会通过一个优化器,在执行之前应用优化规则。

可以在表和DataStream / DataSet之间进行无缝转换,允许程序混合使用Table API和DataStream 和DataSet API。

Flink提供的最高级抽象是SQL。 这种抽象在语义和表现力方面与Table API类似,但是将程序表示为SQL查询表达式。在SQL抽象与Table API紧密地相互作用,另外,SQL查询可以在Table API中定义的表上执行。

程序和数据流(Programs and Dataflows)

Flink可以说是由流(streams)和转换(transformations)为基础构建的(请注意,Flink的DataSet API中使用的数据集也是内部的流 )。从概念上讲,流是数据记录(可能是永无止境的)流,而转换是将一个或多个流作为输入,并产生一个或多个输出流。

执行时,Flink程序被映射到由流和转换运算符组成的流式数据流。每个数据从一个或多个源(sources)开始,并在一个或者多个接收器(sinks)中结束。数据流类似于一个任意有向无环图(DAG)。尽管通过迭代构造允许特殊形式的循环,但是为了简单起见,我们姑且先忽视这种情况。

程序中的转换与数据流中的操作符通常是一一对应的。然而,有时候,一个转换可能由多个转换操作符组成。

信号源(sources)和接收器(sinks)记录在流式连接器批量连接器文档中。DataStream运算符DataSet转换中记录了转换。

并行数据流(Parallel Dataflows)

Flink中的程序本质上是并行和分布的。在执行过程中,一个流有一个或者多个流分区,每个运算符有一个或者多个子任务。操作符子任务彼此独立,并且在不同的线程中执行,并且可能在不同的机器或容器上执行。

操作符子任务的数量是该特定操作符的并行度。流的并行性总是由生产它的操作符决定。同一个程序的不同运算符可能有不同的并行级别。

流可以以一对一(One-to-one)或者重新分配(Redistributing)的模式在两个操作符之间传输:

One-to-one: 保留了元素的分区和顺序,如上图中source —>map。这意味着map运算符的subtask[1]将按照源运算符的subtask[1]所产生顺序相同。

Redistributing: 如上图所示,map和keyBy/window之间,以及keyBy/window和Sink之间重新分配流,将会改变流的分区。每个操作符子任务根据所选的转换将数据发送到不同的目标子任务。比如 keyBy()(其通过哈希重新分区),broadcast(), or rebalance() (其随机重新分区)。在重新分配 交换中,元素之间的排序只保存在每对发送和接收的子任务中(例如map() 的subtask[1] 和 keyBy /window的subtask [2])。所以在这个例子里,每个关键字中的排序都被保留下来,但是并行性确实造成了不同关键字汇总结果后顺序的非确定性。

有关配置和控制并行的细节可以在并行执行的文档中找到。

窗口(Windows)

聚合事件(如:sum,count,etc)在流上的工作方式与批处理中的不同。例如,我们不能够去统计流中的所有元素,因为流一般是无限的(无界的)。因而,流中的一些aggregate操作,是由Windows控制的,例如:计算过去五分钟或者最后100个元素的总和。

Windows可以是由时间驱动的(例如,每30秒)或者数据驱动(例如每100个元素)。这可以用来区分不同类型的Windows,例如:tumbling windows (no overlap), sliding windows (with overlap), and session windows (punctuated by a gap of inactivity).

更多的窗口示例可以在这篇博客文章中找到。更多细节在窗口文档

时间(Time)

当我们在流式编程中谈及时间时,可以参考不同的时间概念:

Event Time, 是事件创建的时间,通常用时间戳表示。Flink通过时间戳分配器来访问事件时间戳。

Ingestion time, 是事件进入Flink的时间,在源操作中每个记录都会获得源的当前时间作为时间戳,后续基于时间的操作(如: time window)会依赖这个时间戳

Processing Time, 是指each operator 执行程序时对应的物理机的系统时间

有关如何处理时间的更多细节,请参阅event time 文档

有状态的操作(Stateful Operations)

尽管数据流中很多操作看起来像一个单独的事件,但是一些操作会跨越几个事件记下相关的的信息(比如像window operators)。这种操作被称为有状态的(stateful)。

这种有状态的操作,被保存在一种key/value的存储结构之中。状态与有状态操作符读取的流严格分区和分配。只有在keyed()函数之后才能访问key/value状态。并且仅限于与当前事件的键相关的值。流和状态的keys的匹配保证了所有状态更新都是本地操作,保证了一致性,所以不需要事务的开销。这种匹配还允许flink重新分配状态,并公开的调整分区。

有关更多信息,请参阅有关状态的文档

检查点容错(Checkpoints for Fault Tolerance)

Flink使用流重播(stream replay)检查点(checkpointing) 的组合来实现容错。检查点与每个输入流中的特定点以及每个操作元的相应状态有关。数据流可以从检查点恢复,同时保持一致性(exactly-once processing语义),方法是恢复operators 的状态并从检查点重放事件。

检查点间隔是在执行恢复时间(需要被重放的事件的数量)的情况下折衷的容错开销手段。

容错内部的描述提供了有关Flink如何管理检查点和相关主题的更多信息。有关启用和配置检查点的详细信息位于检查点API文档中。

批处理流(Batch on Streaming)

Flink执行批处理程序作为流程序的特殊情况,它是有限的(元素是有限的)。A*DataSet *在内部视为数据流。因此,上述概念同样适用于批处理程序,就像适用于流式处理程序一样,但有一点例外:

  • 批处理程序的容错不使用检查点。通过完全重放流来恢复。这是可能的,因为输入是有限的。这将成本更多推向recovery,但是使常规地处理更便宜,因为它避免了检查点。

  • DataSet API中的有状态操作使用简化的内存/外核数据结构,而不是键/值索引。

  • DataSet API引入了特殊的同步(超级)iterations,这只能在有界的流上进行。有关详细信息,请查看iterations文档

以上是关于Flink 有状态的流的工作(Working with state)的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink 数据流编程模型

Flink状态管理和容错机制介绍

Flink 如何实现新的流处理应用第二部分:版本化状态

Flink 如何实现新的流处理应用第二部分:版本化状态

如何使用 api rest 传递 flink 流作为参数并返回转换后的流

如何使用api rest通过flink流作为参数并返回转换后的流