DataStream API
Posted JiaXingNashishua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DataStream API相关的知识,希望对你有一定的参考价值。
目录
5.3.4 物理分区(Physical Partitioning)
5.3.4 物理分区(Physical Partitioning)
防止数据倾斜,保证各个分区的负载均衡,实现真正的数据分区。
“分区”(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区 去进行下一步处理。其实我们对分区操作并不陌生,前面介绍聚合算子时,已经提到了 keyBy, 它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说,keyBy 是一种逻辑分区(logical partitioning)操作。
如果说 keyBy 这种逻辑分区是一种“软分区”,那真正硬核的分区就应该是所谓的“物理分区”(physical partitioning)。也就是我们要真正控制分区策略,精准地调配数据,告诉每个 数据到底去哪里。其实这种分区方式在一些情况下已经在发生了:例如我们编写的程序可能对 多个处理任务设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该还在当前分区以直通(forward)方式传输了——因为如果并行度变小,当前分区可能没有下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。所以这种情况下,系统会自动地将数据均匀地发往下游所有的并行任务,保证各个分区的负载均衡。
1. 随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随 机地分配到下游算子的并行任务中去。随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地 传递到下游任务分区,如图所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。经过随机分区之后,得到的依然是一个 DataStream。
2. 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图 所示。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
注:Round-Robin 算法用在了很多地方,例如 Kafka 和 nginx。
3. 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就 是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局 部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。
4. 广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一 份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
5. 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
6. 自定义分区(Custom)
当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 ,我们可以通过使用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个 是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定, 也可以通过字段位置索引来指定,还可以实现一个 KeySelector。
上述分区的代码实现:
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import scala.Int;
public class TransFromPartitionTest
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream=env.fromElements(
new Event("Bob","./cart",2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob","./home",3500L),
new Event("Alice","./home",3700L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L),
new Event("Alice","./prod?id=2",5500L)
);
//1.均匀分配到4个分区,随机分区
// stream.shuffle().print().setParallelism(4);
//2.轮询分区:4,1,2,3
//stream.rebalance().print().setParallelism(4);
//3.rescale 重缩放分区
/* env.addSource(new RichParallelSourceFunction<Integer>()
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception
for(int i=0;i<8;i++)
//将奇偶数分别发送到0号和1号并进行分区
if(i%2== getRuntimeContext().getIndexOfThisSubtask())
sourceContext.collect(i);
@Override
public void cancel()
).setParallelism(2)
.rescale()
.print()
.setParallelism(4);*/
//4.广播
// stream.broadcast().print().setParallelism(4);
//5.全局分区,相当于把所有数据合并到1分区
// stream.global().print().setParallelism(4);
//6.自定义重分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner<Integer>()
@Override
public int partition(Integer key, int numPartitions)
return key%2;
, new KeySelector<Integer, Integer>()
@Override
public Integer getKey(Integer value) throws Exception
return value;
)
.print().setParallelism(4);
env.execute();
5.4 输出算子(Sink)
Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供 支持,如图 所示,本节将主要讲解 Flink 中的 Sink 操作。我们已经了解了 Flink 程序如何 对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。
5.4.1 连接到外部系统
在 Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算 子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任 何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连 接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用
RichMapFunction,在 open() 生命周期中做连接操作。
这样看起来很方便,却会带来很多问题。Flink 作为一个快速的分布式实时流处理系统, 对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果 的正确性。这种性质一般被称作“状态一致性”。Flink 内部提供了一致性检查点(checkpoint) 来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后 就很难回退到从前了。
为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外 部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink算子完成的。 Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论 怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统 一把它直观地叫作“输出算子”。
之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。
与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是 通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入 一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:
default void invoke(IN value, Context context) throws Exception
当然,SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框 架的 Sink 连接器。
我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连 接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一 些其他第三方系统与 Flink 的连接器
除此以外,就需要用户自定义实现 sink 连接器了。接下来,我们就选取一些常见的外部系统进行展开讲解。
5.4.2 输出到文件
最简单的输出方式,当然就是写入文件了。对应着读取文件作为输入数据源,Flink 本来 也有一些非常简单粗暴的输出到文件的预实现方法:如 writeAsText()、writeAsCsv(),可以直 接将输出结果保存到文本文件或 Csv 文件。但我们知道,这种方式是不支持同时写入一份文 件的;所以我们往往会将最后的 Sink 操作并行度设为 1,这就大大拖慢了系统效率;而且对 于故障恢复后的状态一致性,也没有任何保证。所以目前这些简单的方法已经要被弃用。 Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类
RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。 StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink
支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。 它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分 区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶” 的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保 存的文件,记录的都是 1 小时的输出数据。 StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet) 格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用
StreamingFileSink 的静态方法:
⚫ 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
⚫ 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径 (basePath)和数据的编码逻辑(rowEncoder 或bulkWriterFactory)。
代码实现:
package com.atguigu.chapter05;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
public class SinkToFileTest
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
//全局设置分区为4,所以最后会写入到4个文件中
env.setParallelism(4);
DataStreamSource<Event> stream=env.fromElements(
new Event("Bob","./cart",2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob","./home",3500L),
new Event("Alice","./home",3700L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L),
new Event("Alice","./prod?id=2",5500L)
);
StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
)
.build();
//把要处理的数据从event转化为string,然后在进行输出
stream.map(data -> data.toString())
.addSink(streamingFileSink);
env.execute();
这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面 的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
⚫ 至少包含 15 分钟的数据
⚫ 最近 5 分钟没有收到新的数据
⚫ 文件大小已达到 1 GB
5.4.3 输出到 Kafka
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。
如果仅仅是支持读写,那还说明 不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。
下面我们要实现从kafka消费数据,然后再使用flink处理数据,最后输出到kafka中。
(一)编写代码:
package com.atguigu.chapter05;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
public class SinkToFileTest
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
//全局设置分区为4,所以最后会写入到4个文件中
env.setParallelism(4);
DataStreamSource<Event> stream=env.fromElements(
new Event("Bob","./cart",2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob","./home",3500L),
new Event("Alice","./home",3700L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L),
new Event("Alice","./prod?id=2",5500L)
);
StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
)
.build();
//把要处理的数据从event转化为string,然后在进行输出
stream.map(data -> data.toString())
.addSink(streamingFileSink);
env.execute();
(二)启动zookeeper和kafka
zk.sh start
kf.sh start
(三)生成kafka生产者(数据源)
bin/kakfa-console-producer.sh --broker-list localhost:9092 --topic clicks
(四)在 Linux 主机启动一个消费者, 查看是否收到数据(输出数据)
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic events
5.4.4 输出到 mysql(JDBC)
关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据 存储的主要形式。MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的 场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往 往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。
(1)导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
(2)编写代码
package com.atguigu.chapter05;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinkToMySql
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream=env.fromElements(
new Event("Bob","./cart",2000L),
new Event("Alice","./prod?id=100",3000L),
new Event("Bob","./prod?id=1",3300L),
new Event("Bob","./home",3500L),
new Event("Alice","./home",3700L),
new Event("Bob","./prod?id=2",3800L),
new Event("Bob","./prod?id=3",4200L),
new Event("Alice","./prod?id=2",5500L)
);
stream.addSink(JdbcSink.sink(
"INSERT INTO clicks (user,url) VALUES(?,?)",
((statement,event) ->
statement.setString(1,event.user);
statement.setString(2,event.url);
),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("root")
.build()
));
env.execute();
(3)打开MySQL服务,创建对应的数据库和表
(4)运行代码,用客户端连接 MySQL,查看是否成功写入数据。
mysql> select * from clicks;
+------+--------------+
| user | url |
+------+--------------+
| Mary | ./home |
| Alice| ./prod?id=300 |
| Bob | ./prod?id=3 |
+------+---------------+
3 rows in set (0.00 sec)
5.4.5 自定义 Sink 输出
如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外 部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有 现成的,我们就只好自力更生了。例如,Flink 并没有提供 HBase 的连接器,所以需要我们自 己写。
在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念, 创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。
(1)导入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>$hbase.version</version>
</dependency>
(2)编写输出到 HBase 的示例代码
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import java.nio.charset.StandardCharsets;
public class SinkCustomtoHBase
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.fromElements("hello", "world")
.addSink(
new RichSinkFunction<String>()
public org.apache.hadoop.conf.Configuration
configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入
public Connection connection; // 管理 Hbase 连接
@Override
public void open(Configuration parameters) throws
Exception
super.open(parameters);
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum",
"hadoop102:2181");
connection =
ConnectionFactory.createConnection(configuration);
@Override
public void invoke(String value, Context context) throws
Exception
Table table =
connection.getTable(TableName.valueOf("test")); // 表名为 test
Put put = new
Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
, value.getBytes(StandardCharsets.UTF_8) // 写
入的数据
, "1".getBytes(StandardCharsets.UTF_8)); // 写
入的数据
table.put(put); // 执行 put 操作
table.close(); // 将表关闭
@Override
public void close() throws Exception
super.close();
connection.close(); // 关闭连接
);
env.execute();
(3)可以在 HBase 查看插入的数据。
5.5 本章总结
本章从编写 Flink 程序的基本流程入手,依次讲解了执行环境的创建、数据源的读取、数 据流的转换操作,和最终结果数据的输出,对各种常见的转换操作 API 和外部系统的连接都 做了详细介绍,并在其中穿插阐述了 Flink 中支持的数据类型和 UDF 的用法。我们可以自信 地说,到目前为止已经充分掌握了 DataStream API 的基本用法,熟悉了 Flink 的编程习惯,应 该说已经真正跨进了 Flink 流处理的大门。
当然,本章对于转换算子只是一个简单介绍,Flink 中的操作远远不止这些,还有窗口 (Window)、多流转换、底层的处理函数(Process Function)以及状态编程等更加高级的用法。 另外本章中由于涉及读写外部系统,我们不只一次地提到了“精确一次(exactly once)”的状 态一致性,这也是 Flink 的高级特性之一。
以上是关于DataStream API的主要内容,如果未能解决你的问题,请参考以下文章
Apache Flink -Streaming(DataStream API)
1.18.2.8与DataStream和DataSet API结合,Scala隐式转换,通过DataSet或DataStream创建视图,将DataStream或DataSet转换成表 等