Flink学习之DataStream API(python版本)

Posted 柳小葱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink学习之DataStream API(python版本)相关的知识,希望对你有一定的参考价值。

💦今天我们来学习flink中较为基础的DataStream API,DataStream API用来处理流数据。本文主要是以pyflink的形式来进行讲解,对往期内容感兴趣的小伙伴👇:

💛本博客的API都是python的,根据流数据处理的不同阶段,去官方的pyflink文档中寻找对应的python API 总结而成,如有遗漏的地方,请大家指正。

目录

1. 安装pyflink

Flink支持python3.6、3.7和3.8,同时Flink1.11以后也支持windows系统了,大家只要直接运行命令即可安装。

#安装命令
python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/

我是在ubuntu中安装的,记得安装java8或11哦,出现如下界面即成功了。

2. DataStream API

DataStream API是Flink框架处理无界数据流的重要接口。前面提到,任何一个完整的Flink应用程序应该包含如下三个部分:

  • 数据源(DataSource)。
  • 转换操作(Transformation)。
  • 数据汇(DataSink)。

2.1 DataSources数据输入

  1. 从文件读取数据
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')

  1. 从集合Collection中读取数据
env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
  1. 自定义数据源
env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
  1. 还支持其他的数据源,上面几种较为常见。

2.2 DataSteam转换操作

当Flink应用程序生成数据源后,就需要根据业务需求,通过一系列转换操作对数据流上的元素进行各种计算,从而输出最终的结果。

  1. map

有时候,我们需要对数据流上的每个元素进行处理,比如将单个文本转换成一个元组,即1对1的转换操作,此时可以通过map转换操作完成。

datastreamsource.map(func, output_type) 
#Parameters
#func – The MapFunction that is called for each element of the DataStream.
#output_type – The type information of the MapFunction output data.
#Returns
#The transformed DataStream.
  1. flat_map

在某些情况下,需要对数据流中每个元素生成多个输出,即1对N的转换操作,那么此时可以利用flatMap操作。

datastreamsource.flat_map(func, output_type) 
#Parameters
#func – The FlatMapFunction that is called for each element of the DataStream.
#output_type – The type information of output data.
#Returns
#The transformed DataStream.
  1. fliter

有时要从数据流中筛选出符合预期的数据,那就需要对数据流进行过滤处理,即利用filter转换操作。

datastreamsource.filter(func) 
#Parameters
#func – The FilterFunction that is called for each element of the DataStream.
#Returns
#The filtered DataStream.
  1. key_by

针对不同的数据流元素,有时需要根据某些字段值,作为分区的Key来并行处理数据,此时就需要用到keyBy转换操作。它将一个DataStream类型的数据流转换成一个KeyedStream数据流类型

datastreamsource.key_by(key_selector,key_type) 
#Parameters
#key_selector – The KeySelector to be used for extracting the key for partitioning.
#key_type – The type information describing the key type.
#Returns
#The DataStream with partitioned state(i.e. KeyedStream).
  1. reduce

对于分区的数据流,对数据进行reduce处理,它实际上是一种聚合操作,将两个输入元素合并成一个输出元素。它是KeyedStream流上的操作

datastreamsource.reduce(func)
#Parameters
#func – The ReduceFunction that is called for each element of the DataStream.
#Returns
#The transformed DataStream.

例如:

ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
  1. union

在流操作场景中,有时需要合并多个流,即将多个数据流合并成一个数据流,此时可以使用union转换操作(最多合并3个)

#流1合并2,3
datastreamsource1.union(datastreamsource2,datastreamsource3) 
#Parameters
#datastreamsource – The DataStream to union outputwith.
#Returns The DataStream.

  1. connect

除了union可以合并流,还可以使用connect对2个数据流进行合并,且两个流的数据类型可以不相同。

datastreamsource.connect(ds)
#Parameters
#ds – The DataStream with which this stream will be connected.
#Returns
#The ConnectedStreams.
  1. project
#dataStreamSource.project(1, 0)方法从数据源dataStreamSource中筛选出2个字段,其字段索引分别是1和0,此时列也重新进行排序。
datastreamsource.project(*field_indexes: int) 
#Parameters
#field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
#Returns
#The projected DataStream.
  1. partition_custom

partition_custom转换操作可以根据自身需要,自行制定分区规则,partitionCustom只能对单个Key进行分区,不支持复合Key。

datastreamsource.partition_custom(partitioner, key_selector) 
#Parameters
#partitioner – The partitioner to assign partitions to keys.
#key_selector – The KeySelector with which the DataStream is partitioned.
#Returns
#The partitioned DataStream.

  1. window转换操作

Flink通过window机制,将无界数据流划分成多个有界的数据流,从而对有界数据流进行数据统计分析,window上还有多种转换操作,如max求窗口最大值,sum求窗口中元素和等。当窗口中的内置转换操作不能满足业务需求时,可以自定义内部的处理逻辑,即用apply方法传入一个自定义的WindowFunction

#CountWindow将datastream分成几个窗口
datastreamsource.CountWindow(id: int)

2.3 DataSinks数据输出

当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。

  1. sink_to
datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink) 
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the execute() method is called.

#Parameters
#sink – The user defined sink.
#Returns
#The closed DataStream.
  1. add_sink
datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction) 
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.

#Parameters
#sink_func – The SinkFunction object.
#Returns
#The closed DataStream.

3. DataSet

上面的部分,我们主要讲述了流处理DataStream的DataSource数据源、DataStream转换操作以及DataSink数据汇,在Flink中将批数据称为DataSet,关于批数据的处理总结如下:

  • 数据源:和DataStream相似
  • 转换操作:参考spark的批处理api
  • 数据汇:和DataStream相似

DataSet在这里就不做过多讲述。

4. 参考资料

《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

以上是关于Flink学习之DataStream API(python版本)的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习之Table API(python版本)

Flink学习之Table API(python版本)

Apache Flink -Streaming(DataStream API)

Flink —— DataStream API

Flink DataStream API

Flink DataStream API