从0到1Flink的成长之路

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路相关的知识,希望对你有一定的参考价值。

数据转换 Transformation

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/

英文版API解释

Transformation Description
MapTakes one element and produces one element.
data.map { x => x.toInt }
FlatMapTakes one element and produces zero, one, or more elements.
data.flatMap { str => str.split(" ") }
MapPartitionTransforms a parallel partition in a single function call. The function get 
the partition as an `Iterator` and can produce an arbitrary number of 
result values. The number of elements in each partition depends on the 
degree-of-parallelism and previous operations.
data.mapPartition { in => in map { (_, 1) } }
Filter Evaluates a boolean function for each element and retains those for which 
the function returns true.
IMPORTANT: The system assumes that the function does not modify the element 
on which the predicate is applied. Violating this assumption can lead to 
incorrect results.
data.filter { _ > 1000 }
Reduce Combines a group of elements into a single element by repeatedly combining 
two elements into one. Reduce may be applied on a full data set, or on a 
grouped data set.
data.reduce { _ + _ }
ReduceGroupCombines a group of elements into one or more elements. ReduceGroup may be 
applied on a full data set, or on a grouped data set.
data.reduceGroup { elements => elements.sum }
AggregateAggregates a group of values into a single value. Aggregation functions can 
be thought of as built-in reduce functions. Aggregate may be applied on a 
full data set, or on a grouped data set.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggrega
te(MIN, 2);
You can also use short-hand syntax for minimum, maximum, and sum 
aggregations.
val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
Distinct Returns the distinct elements of a data set. It removes the duplicate 
entries from the input DataSet, with respect to all fields of the elements, 
or a subset of fields.
data.distinct()
Join Joins two data sets by creating all pairs of elements that are equal on 
their keys. Optionally uses a JoinFunction to turn the pair of elements 
into a single element, or a FlatJoinFunction to turn the pair of elements 
into arbitrarily many (including none) elements. See the keys section to 
learn how to define join keys.
// In this case tuple fields are used as keys. "0" is the join field on the f
irst tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)
You can specify the way that the runtime executes the join via Join Hints. 
The hints describe whether the join happens through partitioning or 
broadcasting, and whether it uses a sort-based or a hash-based algorithm. 
Please refer to the Transformations Guide for a list of possible hints and 
an example. If no hint is specified, the system will try to make an 
estimate of the input sizes and pick the best strategy according to those 
estimates.
// This executes a join by broadcasting the first data set
// using a hash table for the broadcasted data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
 .where(0).equalTo(1)
Note that the join transformation works only for equi-joins. Other join 
types need to be expressed using OuterJoin or CoGroup.
OuterJoin Performs a left, right, or full outer join on two data sets. Outer joins 
are similar to regular (inner) joins and create all pairs of elements that 
are equal on their keys. In addition, records of the "outer" side (left, 
right, or both in case of full) are preserved if no matching key is found 
in the other side. Matching pairs of elements (or one element and a `null` 
value for the other input) are given to a JoinFunction to turn the pair of 
elements into a single element, or to a FlatJoinFunction to turn the pair 
of elements into arbitrarily many (including none) elements. See the keys 
section to learn how to define join keys.
val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
 (left, right) =>
 val a = if (left == null) "none" else left._1
 (a, right)
 }
CoGroupThe two-dimensional variant of the reduce operation. Groups each input on 
one or more fields and then joins the groups. The transformation function 
is called per pair of groups. See the keys section to learn how to define 
coGroup keys.
data1.coGroup(data2).where(0).equalTo(1)
Cross Builds the Cartesian product (cross product) of two inputs, creating all 
pairs of elements. Optionally uses a CrossFunction to turn the pair of 
elements into a single element
val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)
Note: Cross is potentially a very compute-intensive operation which can 
challenge even large compute clusters! It is adviced to hint the system 
with the DataSet sizes by using crossWithTiny() and crossWithHuge().
Union Produces the union of two data sets.
data.union(data2)
Rebalance Evenly rebalances the parallel partitions of a data set to eliminate data 
skew. Only Map-like transformations may follow a rebalance transformation.
val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
Hash-PartitionHash-partitions a data set on a given key. Keys can be specified as 
position keys, expression keys, and key selector functions.
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
Range-Partition Range-partitions a data set on a given key. Keys can be specified as 
position keys, expression keys, and key selector functions.
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
Custom PartitioningManually specify a partitioning over the data.
Note: This method works only on single field keys.
val in: DataSet[(Int, String)] = // [...]
val result = in
 .partitionCustom(partitioner: Partitioner[K], key)
Sort PartitionLocally sorts all partitions of a data set on a specified field in a 
specified order. Fields can be specified as tuple positions or field 
expressions. Sorting on multiple fields is done by chaining sortPartition() 
calls.
val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-n Returns the first n (arbitrary) elements of a data set. First-n can be 
applied on a regular data set, a grouped data set, or a grouped-sorted data 
set. Grouping keys can be specified as key-selector functions, tuple 
positions or case class fields.
val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

中文版API解释

TransformationDescription
Map在算子中得到一个元素并生成一个新元素data.map { x => x.toInt }
FlatMap在算子中获取一个元素, 并生成任意个数的元素data.flatMap { str => str.split(" ") }
MapPartition类似Map, 但是一次Map一整个并行分区data.mapPartition { in => in map { (_, 1) } }
Filter如果算子返回true则包含进数据集, 如果不是则被过滤掉data.filter { _ > 100 }
Reduce通过将两个元素合并为一个元素, 从而将一组元素合并为一个元素data.reduce { _ + _ }
ReduceGroup 将一组元素合并为一个或者多个元素data.reduceGroup { elements => elements.sum }
Aggregate 讲一组值聚合为一个值, 聚合函数可以看作是内置的Reduce函数data.aggregate(SUM, 
0).aggregate(MIN, 2)data.sum(0).min(2)
Distinct 去重
Join 按照相同的Key合并两个数据集input1.join(input2).where(0).equalTo(1)同时也可以选择进
行合并的时候的策略, 是分区还是广播, 是基于排序的算法还是基于哈希的算法
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(1)
OuterJoin 外连接, 包括左外, 右外, 完全外连接等left.leftOuterJoin(right).where(0).equalTo(1) 
{ (left, right) => ... }
CoGroup 二维变量的Reduce运算, 对每个输入数据集中的字段进行分组, 然后join这些组
input1.coGroup(input2).where(0).equalTo(1)
Cross 笛卡尔积input1.cross(input2)
Union 并集input1.union(input2)
Rebalance分区重新平衡, 以消除数据倾斜input.rebalance()
Hash-Partition 按照Hash分区input.partitionByHash(0)
Range-Partition 按照Range分区input.partitionByRange(0)
CustomParititioning自定义分区input.partitionCustom(partitioner: Partitioner[K], key)
First-n返回数据集中的前n个元素input.first(3)
partitionByHash 按照指定的key进行hash分区
sortPartition指定字段对分区中的数据进行排序

以上是关于从0到1Flink的成长之路的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路