Flink学习之DataStream API(python版本)
Posted 柳小葱
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink学习之DataStream API(python版本)相关的知识,希望对你有一定的参考价值。
💦今天我们来学习flink中较为基础的DataStream API,DataStream API用来处理流数据。本文主要是以pyflink的形式来进行讲解,对往期内容感兴趣的小伙伴👇:
- hadoop专题: hadoop系列文章.
- spark专题: spark系列文章.
- flink专题: Flink系列文章.
💛本博客的API都是python的,根据流数据处理的不同阶段,去官方的pyflink文档中寻找对应的python API 总结而成,如有遗漏的地方,请大家指正。
目录
1. 安装pyflink
Flink支持python3.6、3.7和3.8,同时Flink1.11以后也支持windows系统了,大家只要直接运行命令即可安装。
#安装命令
python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/
我是在ubuntu中安装的,记得安装java8或11哦,出现如下界面即成功了。
2. DataStream API
DataStream API是Flink框架处理无界数据流的重要接口。前面提到,任何一个完整的Flink应用程序应该包含如下三个部分:
- 数据源(DataSource)。
- 转换操作(Transformation)。
- 数据汇(DataSink)。
2.1 DataSources数据输入
- 从文件读取数据
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')
- 从集合Collection中读取数据
env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
- 自定义数据源
env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
- 还支持其他的数据源,上面几种较为常见。
2.2 DataSteam转换操作
当Flink应用程序生成数据源后,就需要根据业务需求,通过一系列转换操作对数据流上的元素进行各种计算,从而输出最终的结果。
- map
有时候,我们需要对数据流上的每个元素进行处理,比如将单个文本转换成一个元组,即1对1的转换操作,此时可以通过map转换操作完成。
datastreamsource.map(func, output_type)
#Parameters
#func – The MapFunction that is called for each element of the DataStream.
#output_type – The type information of the MapFunction output data.
#Returns
#The transformed DataStream.
- flat_map
在某些情况下,需要对数据流中每个元素生成多个输出,即1对N的转换操作,那么此时可以利用flatMap操作。
datastreamsource.flat_map(func, output_type)
#Parameters
#func – The FlatMapFunction that is called for each element of the DataStream.
#output_type – The type information of output data.
#Returns
#The transformed DataStream.
- fliter
有时要从数据流中筛选出符合预期的数据,那就需要对数据流进行过滤处理,即利用filter转换操作。
datastreamsource.filter(func)
#Parameters
#func – The FilterFunction that is called for each element of the DataStream.
#Returns
#The filtered DataStream.
- key_by
针对不同的数据流元素,有时需要根据某些字段值,作为分区的Key来并行处理数据,此时就需要用到keyBy转换操作。它将一个DataStream类型的数据流转换成一个KeyedStream数据流类型
datastreamsource.key_by(key_selector,key_type)
#Parameters
#key_selector – The KeySelector to be used for extracting the key for partitioning.
#key_type – The type information describing the key type.
#Returns
#The DataStream with partitioned state(i.e. KeyedStream).
- reduce
对于分区的数据流,对数据进行reduce处理,它实际上是一种聚合操作,将两个输入元素合并成一个输出元素。它是KeyedStream流上的操作
datastreamsource.reduce(func)
#Parameters
#func – The ReduceFunction that is called for each element of the DataStream.
#Returns
#The transformed DataStream.
例如:
ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
- union
在流操作场景中,有时需要合并多个流,即将多个数据流合并成一个数据流,此时可以使用union转换操作(最多合并3个)
#流1合并2,3
datastreamsource1.union(datastreamsource2,datastreamsource3)
#Parameters
#datastreamsource – The DataStream to union outputwith.
#Returns The DataStream.
- connect
除了union可以合并流,还可以使用connect对2个数据流进行合并,且两个流的数据类型可以不相同。
datastreamsource.connect(ds)
#Parameters
#ds – The DataStream with which this stream will be connected.
#Returns
#The ConnectedStreams.
- project
#dataStreamSource.project(1, 0)方法从数据源dataStreamSource中筛选出2个字段,其字段索引分别是1和0,此时列也重新进行排序。
datastreamsource.project(*field_indexes: int)
#Parameters
#field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
#Returns
#The projected DataStream.
- partition_custom
partition_custom转换操作可以根据自身需要,自行制定分区规则,partitionCustom只能对单个Key进行分区,不支持复合Key。
datastreamsource.partition_custom(partitioner, key_selector)
#Parameters
#partitioner – The partitioner to assign partitions to keys.
#key_selector – The KeySelector with which the DataStream is partitioned.
#Returns
#The partitioned DataStream.
- window转换操作
Flink通过window机制,将无界数据流划分成多个有界的数据流,从而对有界数据流进行数据统计分析,window上还有多种转换操作,如max求窗口最大值,sum求窗口中元素和等。当窗口中的内置转换操作不能满足业务需求时,可以自定义内部的处理逻辑,即用apply方法传入一个自定义的WindowFunction
#CountWindow将datastream分成几个窗口
datastreamsource.CountWindow(id: int)
2.3 DataSinks数据输出
当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。
- sink_to
datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink)
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the execute() method is called.
#Parameters
#sink – The user defined sink.
#Returns
#The closed DataStream.
- add_sink
datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction)
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.
#Parameters
#sink_func – The SinkFunction object.
#Returns
#The closed DataStream.
3. DataSet
上面的部分,我们主要讲述了流处理DataStream的DataSource数据源、DataStream转换操作以及DataSink数据汇,在Flink中将批数据称为DataSet,关于批数据的处理总结如下:
- 数据源:和DataStream相似
- 转换操作:参考spark的批处理api
- 数据汇:和DataStream相似
DataSet在这里就不做过多讲述。
4. 参考资料
《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》
以上是关于Flink学习之DataStream API(python版本)的主要内容,如果未能解决你的问题,请参考以下文章