Flink DataSet编程指南-demo演示及注意事项
Posted Spark高级玩法
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink DataSet编程指南-demo演示及注意事项相关的知识,希望对你有一定的参考价值。
Flink中的DataStream程序是对数据流进行转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流的最初的源可以从各种来源(例如,消息队列,套接字流,文件)创建,并通过sink返回结果,例如可以将数据写入文件或标准输出。Flink程序以各种上下文运行,独立或嵌入其他程序中。执行可能发生在本地JVM或许多机器的集群上。
一,示例程序
改代码可以直接粘贴复制到你自己的工程,只需要导入Flink的相关依赖,具体工程构建方法,请参考。
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, (1,2)) }
.groupBy(0)
.sum(1)
// execute and print result
counts.print()
}
}
二,转换动作
数据转换将一个或多个DataSet转换为新的DataSet。程序可以将多个转换组合成复杂的程序集。
1),Map
取出一个元素转换为另一个元素。
data.map { x => x.toInt }
2),FlatMap
取出一个元素并产生零个,一个或多个元素。
data.flatMap { str => str.split(" ") }
3),MapPartition
在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,可以产生任意数量的结果。每个分区中的元素数量取决于并行度和以前的操作。
data.mapPartition { in => in map { (_, 1) } }
4),Filter
每个元素都会调用布尔函数,并保留函数返回true的元素。
data.filter { _ > 1000 }
5),Reduce
通过重复的将两个元素合并成一个元素,进而将一组元素合并成一个元素。Reduce操作可以应用在完整的数据集上,也可以应用在分组的数据集上。
data.reduce { _ + _ }
6),ReduceGroup
将一组元素组合成一个或多个元素。 ReduceGroup可以应用于完整数据集,也可以应用在分组数据集上。
data.reduceGroup { elements => elements.sum }
7),Aggregate
将一组值聚合为单个值。 聚合函数可以被认为是内置的减少函数。 聚合可以应用在完整的数据集上,也可以应用在分组的数据集上。
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggregate(MIN, 2);
您还可以使用最小,最大和总和的短语法。
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
8),Distinct
返回数据集中不相同的元素。 它从输入DataSet中删除重复条目,依据元素的所有字段或字段的子集。
data.distinct()
9),Join
根据两个数据集指定的相等的key,进行join,这是一个inner join。可选的:可以使用JoinFunction将该对元素转化为单个元素。也可以用FlatJoinFunction将该对元素转化为任意多个元素,包括无。
// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)
您可以通过连接提示(Join Hints)来指定运行时执行连接的方式。这些提示描述了连接是通过分区还是广播发生,以及它是使用基于排序的还是基于散列(hash-based)的算法。有关提示后面会给出详细的介绍。如果没有指定链接方式,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where(0).equalTo(1)
10),OuterJoin
对两个数据集,进行left, right, or full outer join 。OuterJoin类似于常规(内部)连接,并创建在其键上相等的所有元素对。对于根据key没找到的键,则保留满足链接要求的记录,(left,保留join左侧数据集未匹配到的;right join保留右侧数据集的未匹配到;full保留所有数据集未匹配到的。)。此时也可以使用FlatJoinFunction将一对元素转化为任意数量的元素,也可以没有。也可用JoinFunction将一对元素转化为单个元素。
val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
(left, right) =>
val a = if (left == null) "none" else left._1
(a, right)
}
11),CoGroup
Reduce操作的二维变体。对一个或多个字段中的每个输入进行分组,然后加入组。每对组调用转换函数。
data1.coGroup(data2).where(0).equalTo(1)
关于如何定义,key,请参考<>.
12),Cross
构建两个输入的笛卡尔乘积(交叉乘积),创建所有元素对。 可选地,使用CrossFunction将该对元素转换为单个元素
val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)
注意:
Cross操作是一个计算密集型的操作,甚至会对大型集群带来计算挑战。建议使用crossWithTiny()和crossWithHuge()来提示适应DataSet大小的系统。
13),Union
生成两个数据集的并集。
data.union(data2)
14),Rebalance
均匀地重新平衡数据集的并行分区以消除数据倾斜。 只有类似Map的转换可以跟着rebalance操作使用。
val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
15),Hash-Partition
根据给定的key对一个数据集进行hash分区。Key可以是position keys, expression keys, 或者key selector functions.具体请参考<>
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
16),Range-Partition
根据给定的key对一个数据集进行Range分区。Key可以是position keys, expression keys, 或者key selector functions.具体请参考<>
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
17),Custom Partitioning
手动指定一个分区器,该方法只能使用一个key字段。
val in: DataSet[(Int, String)] = // [...]
val result = in
.partitionCustom(partitioner: Partitioner[K], key)
18),Sort Partition
以指定的顺序在指定的字段上本地对数据集的所有分区进行排序。 字段可以指定为元组位置或字段表达式。 通过链接sortPartition()调用来完成对多个字段的排序。
val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
19),First-n
返回数据集的前n个(任意)元素。 first-n可以应用于常规数据集,分组数据集或分组排序的数据集。 分组键可以指定为键选择器函数,元组位置或case class字段。
val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
20),MinBy / MaxBy
从一个或多个字段的值为最小值(最大值)的一组元组中选择一个元组。用于比较的字段必须是有效的关键字段,即可比较的。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。
val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
.minBy(1)
例子,通过匿名模式匹配从元组,case class和集合中提取,如下所示:
val data: DataSet[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
并不支持API开箱即用。要使用此功能,您应该使用Scala API扩展。转换的并行性可以由setParallelism(int)定义,而name(String)将自定义名称分配给转换操作,有助于调试的。withParameters(Configuration)通过配置对象,可以从用户函数中的open()方法访问。
三,Data Sources
1,文件格式
数据源创建初始数据集,例如从文件或从Java集合。创建数据集的一般机制根据InputFormat抽象出来。Flink带有几种内置格式,可以从常见的文件格式创建数据集。
A),File-based
a) readTextFile(path) / TextInputFormat:按行读,返回整行字符串。
b) readTextFileWithValue(path) / TextValueInputFormat:读取文件行,并将其作为StringValues返回。 StringValues是可变字符串。
c) readCsvFile(path) / CsvInputFormat:解析逗号(或另一个char)分隔字段的文件。返回元组,case class或POJO的DataSet。支持基本的java类型及其Value对应的字段类型。
d) readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat:使用给定的分隔符解析新行(或另一个字符序列)文件分隔的基本数据类型,例如String或Integer。
e) readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat:创建一个JobConf,并使用指定的FileInputFormat,Key类和Value类从指定的路径读取文件,并将它们返回为Tuple2 <Key,Value>。
f) readSequenceFile(Key, Value, path) / SequenceFileInputFormat:创建JobConf并从类型为SequenceFileInputFormat,Key类和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。
B),Collection-based
a) fromCollection(Seq):从Seq创建一个数据集。集合中的所有元素必须是相同的类型。
b) fromCollection(Iterator) :从迭代器创建一个数据集。 该类指定迭代器返回的元素的数据类型。
c) fromElements(elements: _*):从迭代器中并行创建一个数据集。 该类指定迭代器返回的元素的数据类型。
d) enerateSequence(from, to):并行生成给定间隔的数字序列。
C),Generic
a) readFile(inputFormat, path) / FileInputFormat:接受文件输入格式。
b) createInput(inputFormat) / InputFormat:接受通用输入格式。
例子:
val env = ExecutionEnvironment.getExecutionEnvironment
// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")
// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
"hdfs:///the/CSV/file",
pojoFields = Array("name", "age", "zipcode"))
// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")
// generate a number sequence
val numbers = env.generateSequence(1, 10000000);
// read a file from the specified path of type TextInputFormat
val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file")
2,配置CSV解析
Flink为CSV解析提供了许多配置选项:
A),lineDelimiter: String:指定单个记录的分隔符。 默认行分隔符是新行字符“\ n”。
B),fieldDelimiter: String:指定分隔记录字段的分隔符。 默认字段分隔符是逗号字符','。
C),includeFields: Array[Int] :定义要从输入文件读取哪些字段(以及要忽略哪些)。 默认情况下,前n个字段(由types()调用中的类型数定义)被解析。
D),pojoFields: Array[String] :指定映射到CSV字段的POJO的字段。 根据POJO字段的类型和顺序自动初始化CSV字段的解析器。
E),parseQuotedStrings: Character:允许引用字符串解析。如果字符串字段的第一个字符是引号(引导或拖尾空格未修剪),则字符串将被解析为引用的字符串。引用字符串中的字段分隔符将被忽略。如果引用的字符串字段的最后一个字符不是引号字符,引用的字符串解析将失败。如果启用了引用的字符串解析,并且该字段的第一个字符不是引用字符串,那么该字符串将被解析为无引号的字符串。默认情况下,禁用引用字符串解析。
F),ignoreComments: String :指定注释前缀。 以指定注释前缀开始的所有行都不会被解析和会被忽略。 默认情况下,不会忽略任何行。
G),lenient: Boolean:使得能够轻松解析,即无法正确解析的行被忽略。 默认情况下,宽大的解析被禁用,无效的行引发异常。
H),ignoreFirstLine: Boolean:将InputFormat配置为忽略输入文件的第一行。 默认情况下不会忽略任何行。
3,输入路径目录的递归遍历
对于基于文件的输入,当输入路径是目录时,默认情况下不嵌套嵌套文件。相反,仅读取基本目录中的文件,而忽略嵌套文件。可以通过recursive.file.enumeration配置参数启用嵌套文件的递归枚举,如下例所示。
// enable recursive enumeration of nested input files
val env = ExecutionEnvironment.getExecutionEnvironment
// create a configuration object
val parameters = new Configuration
// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)
// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
4,读取压缩文件
Flink目前支持输入文件的透明解压缩,如果这些文件标有适当的文件扩展名。特别地,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。 请注意,压缩文件可能不支持并行读取,从而影响作业可扩展性。
下表列出了当前支持的压缩方法。
压缩方式 |
文件扩展名 |
可并行 |
DEFLATE |
.deflate |
no |
GZip |
.gz, .gzip |
no |
Bzip2 |
.bz2 |
no |
XZ |
.xz |
no |
四,Data Sinks
1,输出主要方法
Data sinks从DataSet中取出消息保存或者返回。使用OutputFormat描述data sink操作。Flink带有各种内置的输出格式:
A),writeAsText() / TextOutputFormat:将元素以字符串形式写入。字符串通过调用每个元素的toString()方法获得。
B),writeAsCsv(...) / CsvOutputFormat:将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
C),print() / printToErr():在标准输出/标准错误流中打印每个元素的toString()值。
D),write() / FileOutputFormat:自定义文件输出的方法和基类。 支持自定义对象到字节转换。大多数通用输出方法,用于不是基于文件的data sinks(例如将结果存储在数据库中)。
E),output()/ OutputFormat:
DataSet可以输入到多个操作。 程序可以写入或打印数据集,同时在其上运行其他转换。
2,例子
标准data sink方式:
// text data
val textData: DataSet[String] = // [...]
// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")
// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)
// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");
// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
.writeAsText("file:///path/to/the/result/file")
3,本地排序输出
输出可以使用元组字段位置或字段表达式以指定的顺序在指定的字段上进行本地排序。 这适用于每种输出格式。
val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]
// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print;
// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;
// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);
// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);
// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...);
五,Iteration Operators
迭代在Flink程序中实现循环。迭代运算符封装了程序的一部分并重复执行,将一次迭代的结果(部分解)反馈到下一次迭代中。Flink两种迭代类型:BulkIteration和DeltaIteration
后面会出文章详细介绍flink的迭代类型。
1,批量迭代
要创建一个BulkIteration,调用DataSet的iterate(int)方法,迭代需要从它开始。这将返回一个IterativeDataSet,它可以用常规运算符进行转换。迭代调用的单个参数指定最大迭代次数。
要指定迭代结束,在IterativeDataSet上调用closeWith(DataSet)方法来指定哪个转换应该反馈到下一个迭代。您可以选择使用closeWith(DataSet,DataSet)指定终止条件,如果该DataSet为空,则它将评估第二个DataSet并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。
以下示例迭代地估计Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果这一点在单位圆内,我们增加计数。最后count除以迭代次数乘以4就是PI值。
val env = ExecutionEnvironment.getExecutionEnvironment()
// Create initial DataSet
val initial = env.fromElements(0)
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
val y = Math.random()
i + (if (x * x + y * y < 1) 1 else 0)
}
result
}
val result = count map { c => c / 10000.0 * 4 }
result.print()
env.execute("Iterative Pi Example");
后面要讲的 K-Means的例子就是基于这个实现的。
2,增量迭代
Delta迭代利用某些算法在每次迭代中不改变解的每个数据点的特点。除了每次迭代返回的部分结果外,增量迭代还保持了跨越迭代维护的状态(被叫做解集),可以通过增量更新。迭代计算的结果是上次迭代后的状态。更深一层的迭代相关的原理,后面会陆续出文章介绍。
定义DeltaIteration类似于定义BulkIteration。对于增量迭代,两个数据集形成每个迭代的输入(工作集和解集),并且在每个迭代中产生两个数据集作为结果(新的工作集,解集集增量)。
要创建一个DeltaIteration,在初始解决方案集上调用iterateDelta(initialWorkset,maxIterations,key)。step函数有两个参数:(solutionSet,workset),并且必须返回两个值:(solutionSetDelta,newWorkset)。
下面是一个delta迭代语法的例子:
// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]
val initialWorkset: DataSet[(Long, Double)] = // [...]
val maxIterations = 100
val keyPosition = 0
val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
(solution, workset) =>
val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
val nextWorkset = deltas.filter(new FilterByThreshold())
(deltas, nextWorkset)
}
result.writeAsCsv(outputPath)
env.execute()
六,在函数中对数据对象进行操作
Flink的runtime 以Java对象的形式与用户函数交换数据。函数从runtime 接收输入对象作为方法参数,并返回输出对象作为结果。由于这些对象是由用户函数和运行时代码访问的,因此了解并遵循关于用户代码如何访问,即读取和修改这些对象的规则是非常重要的。
用户函数从常规方法参数(如MapFunction)或通过Iterable参数(如GroupReduceFunction)接收来自Flink 的runtime 的对象。我们将运行时传递给用户函数的对象称为输入对象。用户函数可以将对象作为方法返回值(如MapFunction)或通过Collector (如FlatMapFunction)发送到Flink的runtime 。我们将用户函数发出的对象引用到运行时作为输出对象。
Flink的DataSet API具有两种不同的Flink runtime 创建或重用输入对象的模式。这种行为影响了用户函数如何与输入和输出对象交互的保证和约束。
以下部分定义了这些规则,并给出编写安全用户功能代码的编码指南。
1,禁用对象重用(DEFAULT)
默认情况下,Flink运行于禁用对象重用的模式下。这种模式,确保在函数调用中时钟接受新的输入对象。对象重用禁用模式提供更好的保证,使用更安全。然而,它具有一定的处理开销,并可能导致更高的Java垃圾收集活动。下表说明了用户功能如何在对象重用禁用模式下访问输入和输出对象。
操作 |
保证和限制 |
读取输入对象 |
在方法调用中,保证输入对象的值不会改变。这包括由Iterable服务的对象。例如,可以安全地收集列表或map中由Iterable提供的输入对象。请注意,方法调用后可能会修改对象。在函数调用中记住对象是不安全的。 |
修改输入对象 |
您可以修改输入对象。 |
发射输入对象 |
您可以发出输入对象。输入对象的值在发出后可能已更改。在输入对象发出后读取是非线程安全的 |
读取输出对象 |
给予收集器或作为方法结果返回的对象可能会更改其值。 读取输出对象是不安全的。 |
修改输出对象 |
您可以在对象发出后修改对象并再次发出。 |
禁用对象重用的编码指导(默认)模式:
---不要在方法调用中记住和读取输入对象。
---发出后不要读取对象。
2,启用对象重用
在对象重用启用模式下,Flink的runtime 最小化对象实例化的数量。这可以提高性能并可以减少Java垃圾收集压力。通过调用ExecutionConfig.enableObjectReuse()来激活对象重用启用模式。下表说明了用户功能如何在对象重用启用模式下访问输入和输出对象。
操作 |
保证和限制 |
读取作为常规方法参数接收的输入对象 |
作为常规方法参数接收的输入对象不会在函数调用中修改。方法调用后可能会修改对象。在函数调用中记住对象是不安全的。 |
从Iterable参数读取输入对象 |
从Iterable接收的输入对象只有在调用next()方法之前才有效。一个Iterable或Iterator可以多次服务于相同的对象实例。记住从Iterable接收的输入对象(例如将它们放在列表或map中)是不安全的。 |
修改输入对象 |
除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(reuse)的输入对象外,您不能修改输入对象。 |
发送输入对象 |
除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(reuse)的输入对象外,您不得发出输入对象。 |
读取输出对象 |
给予收集器或作为方法结果返回的对象可能会更改其值。 读取输出对象是不安全的。 |
修改输出对象 |
您可以修改输出对象并再次发出。 |
启用对象重用的编码指南:
---不要记住来自迭代器的输入对象。
---不要在方法调用中记住和读取输入对象。
---除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外,不要修改或发出输入对象。
---为了减少对象实例,您可以随时发出一个被重复修改但从不读取的专用输出对象。
七,Debugging
在讲flink程序部署到分布式集群处理大数据之前,先验证一下实现逻辑是否满足需求是一个很好的做法。 因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。
Flink提供了一些很好的特性,可以在IDE内部进行数据分析前的本地调试,输入测试数据并返回结果集合。这一章节其实跟前面一篇文章的章节很类似<>。
1,本地执行环境
LocalEnvironment会在创建的同一个JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,可以在代码中设置断点并轻松调试程序。
创建并使用LocalEnvironment,如下所示:
val env = ExecutionEnvironment.createLocalEnvironment()
val lines = env.readTextFile(pathToTextFile)
// build your program
env.execute();
2,集合形式的DataSources 和 DataSinks。
通过创建输入文件和读取输出文件,为数据分析项目程序提供输入并检查其输出是麻烦的。Flink具有由Java集合支持的特殊DataSources 和 DataSinks,以简化测试。一旦程序被测试通过,DataSources 和 DataSinks可以很容易地被读/写到外部数据存储(如HDFS)的DataSources 和 DataSinks替换。
集合数据源可以使用如下:
val env = ExecutionEnvironment.createLocalEnvironment()
// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意:目前,收集数据源要求数据类型和迭代器实现Serializable。此外,收集数据源不能并行执行(parallelism = 1)。
八,语义注释
语义注释可以用来给Flink 关于函数行为的提示。他们告诉系统,函数输入的哪些字段去读取和求值,哪些字段从输入到输出未被修改。语义注释是加速执行的强大手段,因为它们允许系统推理在多个操作中重用排序顺序或分区。使用语义注释,最终将省去不必要的排序和shuffle,极大地改进程序的性能。
注意:使用语义注释是可选的。但是,当提供语义注释时,保守是非常重要的!不正确的语义注释会导致Flink对您的程序做出不正确的假设,最终可能导致错误的结果。如果操作员的行为不可预测,则不应提供注释。
目前支持以下语义注释。
1,Forwarded Fields Annotation
转发字段信息声明一些未修改的字段通过函数转发到相同的位置或者输出中的另一个位置。优化器使用该信息来推断功能是否保留诸如排序或分区之类的数据属性。对于对GroupReduce,GroupCombine,CoGroup和MapPartition等输入元素进行操作的功能,定义为转发字段的所有字段必须始终从相同的输入元素共同转发。对于对输入元素组进行操作的函数,例如GroupReduce,GroupCombine,CoGroup和MapPartition,定义为转发字段的所有字段必须始终从相同的输入元素共同转发。
使用字段表达式指定字段转发信息。转发到输出中相同位置的字段可以由其位置指定。指定的位置必须对输入和输出数据类型有效,并且具有相同的类型。例如,String“f2”声明Java输入元组的第三个字段总是等于输出元组中的第三个字段。
将字段未修改转发到输出中的另一个位置,通过字段表达式的方式指定输入的源字段和输出的目标字段。比如,字符串“f0-f2”表示JAVA输入tuple的第一个元素,无变化的copy到输出JAVA tuple的第三个字段。通配符表达式*可用于指代整个输入或输出类型,即“f0 - > *”表示函数的输出始终等于其Java输入元组的第一个字段。
当指定转发的字段时,不要求声明所有转发的字段,但所有声明必须正确。多个转发的字段可以通过将它们以分号分隔为“f0; f2-> f1; f3-> f2”或单独的字符串“f0”,“f2-> f1”,“f3-> f2”
转发的字段信息可以通过在函数类定义上附加Java注释,或者通过在DataSet上调用函数之后传递为操作符参数来声明,如下所示。
A),函数类注释
a) @ForwardedFields用于单输入函数,如Map和Reduce。
b) @ForwardedFieldsFirst 用于拥有两个输入函数的第一个输入,比如Join和CoGroup。
c) @ForwardedFieldsSecond 对于具有两个输入(如Join和CoGroup)的函数的第二个输入。
B),操作符参数
a) data.map(myMapFnc).withForwardedFields() 用于单输入函数,如Map和Reduce。
b) data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() 用于拥有两个输入函数的第一个输入,比如Join和CoGroup。
c) data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() 对于具有两个输入(如Join和CoGroup)的函数的第二个输入。
请注意,无法通过操作符参数覆盖指定为类注释的字段转发信息。
以下示例显示如何使用函数类注释声明转发的字段信息:
@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
def map(value: (Int, Int)): (String, Int, Int) = {
return ("foo", value._2 / 2, value._1)
}
}
2,Non-Forwarded Fields
非转发字段信息声明在函数的输出中不保留在同一位置的所有字段。所有其他字段的值被认为保留在输出中的相同位置。因此,非转发字段信息与转发字段信息相反。GroupReduce,GroupCombine,CoGroup和MapPartition等组操作函数的非转发字段信息必须满足与转发的字段信息相同的要求。
重要信息:非转发字段信息的规范是可选的。 但是如果使用,ALL! 必须指定非转发字段,因为所有其他字段都被视为转发到位。 将转发的字段声明为未转发是安全的。
非转发字段被指定为字段表达式的列表。该列表可以作为单个用分号的字段表达式字符串给出或多个字符串分隔。例如,“f1; f3”和“f1”,“f3”都声明Java元组的第二和第四个字段没有保留在原位,所有其他字段都保留在原位。只能对具有相同输入和输出类型的函数指定非转发字段信息。
未转发的字段信息使用以下注释指定为函数类注释:
A) , @NonForwardedFields 用于单输入函数,如Map和Reduce。
B) ,@NonForwardedFieldsFirst 用于拥有两个输入函数的第一个输入,比如Join和CoGroup。
C) ,@NonForwardedFieldsSecond 对于具有两个输入(如Join和CoGroup)的函数的第二个输入。
以下示例显示如何声明未转发的字段信息:
@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
def map(value: (Int, Int)): (Int, Int) = {
return (value._1, value._2 / 2)
}
}
3,Read Fields
读取字段信息声明所有由函数访问和评估的字段,即函数使用的所有字段来计算其结果。例如,在指定读取字段信息时,必须将在条件语句中评估或用于计算的字段标记为已读。只有未经修改的字段转发到输出,而不评估其值或根本不被访问的字段不被视为被读取。
重要信息:读取字段信息的规范是可选的。但是如果使用,ALL!必须指定读取字段。将非读取字段声明为可读取是安全的。
读取字段被指定为字段表达式的列表。该列表可以作为单个字符串给出,字段表达式用分号或多个字符串分隔。例如“f1; f3”和“f1”,“f3”都声明Java元组的第二和第四个字段被该函数读取和计算。
读取字段信息使用以下注释指定为函数类注释:
A), @NonForwardedFields 用于单输入函数,如Map和Reduce。
B),@NonForwardedFieldsFirst 用于拥有两个输入函数的第一个输入,比如Join和CoGroup。
C),@NonForwardedFieldsSecond 对于具有两个输入(如Join和CoGroup)的函数的第二个输入。
以下示例显示如何声明读取字段信息:
@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
def map(value: (Int, Int, Int, Int)): (Int, Int) = {
if (value._1 == 42) {
return (value._1, value._2)
} else {
return (value._4 + 10, value._2)
}
}
}
九,广播变量
广播变量允许您为操作的所有并行实例提供可用的数据集,以及操作的常规输入。 这对于辅助数据集或数据相关参数化是有用的。 然后,数据集将作为集合在操作算子处可访问。
Broadcast:广播集通过名称通过BroadcastSet(DataSet,String)
Access:通过目标运算符的getRuntimeContext()。getBroadcastVariable(String)访问。
// 1. The DataSet to be broadcasted
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String]() {
var broadcastSet: Traversable[String] = null
override def open(config: Configuration): Unit = {
// 3. Access the broadcasted DataSet as a Collection
broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
}
def map(in: String): String = {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet
确保在注册和访问广播的数据集时,名称(上一个示例中的broadcastSetName)匹配。 有关完整的示例程序,请查看KMeans算法。后面出文章介绍。
注意:
由于广播变量时保存在每个节点的内存中,所以不应太大。对于简单的比如scalar 变量,可以把它作为闭包的参数,或者使用withParameters(...)方法传入配置。
十,分布式缓存
Flink提供了类似于Apache Hadoop的分布式缓存,可以使用户方法的并行实例在本地访问文件。此功能可用于共享包含静态外部数据(如字典或机器学习回归模型)的文件。
缓存的工作原理如下。程序将其执行环境中的特定名称的本地或远程文件系统(如HDFS或S3)的文件或目录注册为缓存文件。执行程序时,Flink会自动将文件或目录复制到所有worker节点的本地文件系统中。用户方法可以查找指定名称下的文件或目录,并从worker节点的本地文件系统访问它。
分布式缓存使用如下:
1,在ExecutionEnvironment中注册文件或目录。
val env = ExecutionEnvironment.getExecutionEnvironment
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()
2,访问用户函数中缓存的文件(这里是MapFunction)。 该函数必须扩展一个RichFunction类,因为它需要访问RuntimeContext。
// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {
override def open(config: Configuration): Unit = {
// access cached file via RuntimeContext and DistributedCache
val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
// read the file (or navigate the directory)
...
}
override def map(value: String): Int = {
// use content of cached file
...
}
}
十一,将参数传递给函数
参数可以使用构造函数或者withParameters(Configuration)方法传递给函数。 这些参数作为函数对象的一部分进行序列化,并发送到所有并行任务实例。
1,通过构造函数
val toFilter = env.fromElements(1, 2, 3)
toFilter.filter(new MyFilter(2))
class MyFilter(limit: Int) extends FilterFunction[Int] {
override def filter(value: Int): Boolean = {
value > limit
}
}
该方法将一个Configuration对象作为参数,将其传递给rich函数的open()方法。 配置对象是从String键到不同值类型的Map。
val toFilter = env.fromElements(1, 2, 3)
val c = new Configuration()
c.setInteger("limit", 2)
toFilter.filter(new RichFilterFunction[Int]() {
var limit = 0
override def open(config: Configuration): Unit = {
limit = config.getInteger("limit", 0)
}
def filter(in: Int): Boolean = {
in > limit
}
}).withParameters(c)
2,通过withParameters(Configuration)
Flink还允许将自定义配置值传递到环境的ExecutionConfig界面。由于执行配置可在所有(丰富)用户功能中访问,因此自定义配置将在所有功能中全局可用。
设置自定义全局配置:
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)
请注意:
你也可以传递一个自定义的类(该类必须继承ExecutionConfig.GlobalJobParameters 类),将其作为一个全局的job 配置传递给执行配置。该接口允许实现Map <String,String> toMap()方法,这将反过来在前端显示配置中的值。
从全局配置访问值:
全局作业参数中的对象可在系统中的许多位置访问。 实现RichFunction接口的所有用户方法均可通过运行时环境进行访问。
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private String mykey;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration globConf = (Configuration) globalParams;
mykey = globConf.getString("mykey", null);
}
以上是关于Flink DataSet编程指南-demo演示及注意事项的主要内容,如果未能解决你的问题,请参考以下文章