从0到1Flink的成长之路
Posted 熊老二-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路相关的知识,希望对你有一定的参考价值。
数据转换 Transformation
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/batch/
英文版API解释
Transformation | Description |
Map | Takes one element and produces one element. data.map { x => x.toInt } |
FlatMap | Takes one element and produces zero, one, or more elements. data.flatMap { str => str.split(" ") } |
MapPartition | Transforms a parallel partition in a single function call. The function get the partition as an `Iterator` and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism and previous operations. data.mapPartition { in => in map { (_, 1) } } |
Filter | Evaluates a boolean function for each element and retains those for which the function returns true. IMPORTANT: The system assumes that the function does not modify the element on which the predicate is applied. Violating this assumption can lead to incorrect results. data.filter { _ > 1000 } |
Reduce | Combines a group of elements into a single element by repeatedly combining two elements into one. Reduce may be applied on a full data set, or on a grouped data set. data.reduce { _ + _ } |
ReduceGroup | Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set. data.reduceGroup { elements => elements.sum } |
Aggregate | Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set. val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 0).aggrega te(MIN, 2); You can also use short-hand syntax for minimum, maximum, and sum aggregations. val input: DataSet[(Int, String, Double)] = // [...] val output: DataSet[(Int, String, Double)] = input.sum(0).min(2) |
Distinct | Returns the distinct elements of a data set. It removes the duplicate entries from the input DataSet, with respect to all fields of the elements, or a subset of fields. data.distinct() |
Join | Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys. // In this case tuple fields are used as keys. "0" is the join field on the f irst tuple // "1" is the join field on the second tuple. val result = input1.join(input2).where(0).equalTo(1) You can specify the way that the runtime executes the join via Join Hints. The hints describe whether the join happens through partitioning or broadcasting, and whether it uses a sort-based or a hash-based algorithm. Please refer to the Transformations Guide for a list of possible hints and an example. If no hint is specified, the system will try to make an estimate of the input sizes and pick the best strategy according to those estimates. // This executes a join by broadcasting the first data set // using a hash table for the broadcasted data val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1) Note that the join transformation works only for equi-joins. Other join types need to be expressed using OuterJoin or CoGroup. |
OuterJoin | Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys. val joined = left.leftOuterJoin(right).where(0).equalTo(1) { (left, right) => val a = if (left == null) "none" else left._1 (a, right) } |
CoGroup | The two-dimensional variant of the reduce operation. Groups each input on one or more fields and then joins the groups. The transformation function is called per pair of groups. See the keys section to learn how to define coGroup keys. data1.coGroup(data2).where(0).equalTo(1) |
Cross | Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element val data1: DataSet[Int] = // [...] val data2: DataSet[String] = // [...] val result: DataSet[(Int, String)] = data1.cross(data2) Note: Cross is potentially a very compute-intensive operation which can challenge even large compute clusters! It is adviced to hint the system with the DataSet sizes by using crossWithTiny() and crossWithHuge(). |
Union | Produces the union of two data sets. data.union(data2) |
Rebalance | Evenly rebalances the parallel partitions of a data set to eliminate data skew. Only Map-like transformations may follow a rebalance transformation. val data1: DataSet[Int] = // [...] val result: DataSet[(Int, String)] = data1.rebalance().map(...) |
Hash-Partition | Hash-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions. val in: DataSet[(Int, String)] = // [...] val result = in.partitionByHash(0).mapPartition { ... } |
Range-Partition | Range-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions. val in: DataSet[(Int, String)] = // [...] val result = in.partitionByRange(0).mapPartition { ... } |
Custom Partitioning | Manually specify a partitioning over the data. Note: This method works only on single field keys. val in: DataSet[(Int, String)] = // [...] val result = in .partitionCustom(partitioner: Partitioner[K], key) |
Sort Partition | Locally sorts all partitions of a data set on a specified field in a specified order. Fields can be specified as tuple positions or field expressions. Sorting on multiple fields is done by chaining sortPartition() calls. val in: DataSet[(Int, String)] = // [...] val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... } |
First-n | Returns the first n (arbitrary) elements of a data set. First-n can be applied on a regular data set, a grouped data set, or a grouped-sorted data set. Grouping keys can be specified as key-selector functions, tuple positions or case class fields. val in: DataSet[(Int, String)] = // [...] // regular data set val result1 = in.first(3) // grouped data set val result2 = in.groupBy(0).first(3) // grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3) |
中文版API解释
Transformation | Description |
Map | 在算子中得到一个元素并生成一个新元素data.map { x => x.toInt } |
FlatMap | 在算子中获取一个元素, 并生成任意个数的元素data.flatMap { str => str.split(" ") } |
MapPartition | 类似Map, 但是一次Map一整个并行分区data.mapPartition { in => in map { (_, 1) } } |
Filter | 如果算子返回true则包含进数据集, 如果不是则被过滤掉data.filter { _ > 100 } |
Reduce | 通过将两个元素合并为一个元素, 从而将一组元素合并为一个元素data.reduce { _ + _ } |
ReduceGroup | 将一组元素合并为一个或者多个元素data.reduceGroup { elements => elements.sum } |
Aggregate | 讲一组值聚合为一个值, 聚合函数可以看作是内置的Reduce函数data.aggregate(SUM, 0).aggregate(MIN, 2)data.sum(0).min(2) |
Distinct | 去重 |
Join | 按照相同的Key合并两个数据集input1.join(input2).where(0).equalTo(1)同时也可以选择进 行合并的时候的策略, 是分区还是广播, 是基于排序的算法还是基于哈希的算法 input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(1) |
OuterJoin | 外连接, 包括左外, 右外, 完全外连接等left.leftOuterJoin(right).where(0).equalTo(1) { (left, right) => ... } |
CoGroup | 二维变量的Reduce运算, 对每个输入数据集中的字段进行分组, 然后join这些组 input1.coGroup(input2).where(0).equalTo(1) |
Cross | 笛卡尔积input1.cross(input2) |
Union | 并集input1.union(input2) |
Rebalance | 分区重新平衡, 以消除数据倾斜input.rebalance() |
Hash-Partition | 按照Hash分区input.partitionByHash(0) |
Range-Partition | 按照Range分区input.partitionByRange(0) |
CustomParititioning | 自定义分区input.partitionCustom(partitioner: Partitioner[K], key) |
First-n | 返回数据集中的前n个元素input.first(3) |
partitionByHash | 按照指定的key进行hash分区 |
sortPartition | 指定字段对分区中的数据进行排序 |
以上是关于从0到1Flink的成长之路的主要内容,如果未能解决你的问题,请参考以下文章