Flink性能调优总结
Posted Flink
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink性能调优总结相关的知识,希望对你有一定的参考价值。
-Xloggc:<LOG_DIR>/gc.log
-XX:+PrintGCDetails
-XX:-OmitStackTraceInFastThrow
-XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=20
-XX:GCLogFileSize=20M
优化GC
开发Flink应用程序时,优化DataStream的数据分区或分组操作。
2 设置并行度
算子层次
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
执行环境层次
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
客户端层次
系统层次
3.配置进程参数
在使用yarn-session命令时,添加“-jm MEM”参数设置内存。
在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。
在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。
在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。
在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。
在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。
将在使用yarn-sesion命令时,添加“-tm MEM”参数设置内存。
将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
设计分区方法
随机分区:将元素随机地进行分区。dataStream.shuffle();
Rebalancing (Round-robin partitioning):基于round-robin对元素进行分区,使得每个分区负责均衡。对于存在数据倾斜的性能优化是很有用的。dataStream.rebalance();
Rescaling:以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全rebalance 介入(引入rebalance()),这会非常有用。dataStream.rescale();
广播:广播每个元素到所有分区。dataStream.broadcast();
自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。简单示例如下所示:
// fromElements构造简单的Tuple2流
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
// 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
@Override
public int partition(Tuple2<String, Integer> key, int numPartitions) {
return (key.f0.length() + key.f1) % numPartitions;
}
};
// 使用Tuple2进行分区的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
return value;
}
}).print();
配置netty网络通信
“taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp _ [rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。
“taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
解决数据倾斜
需要重新设计key,以更小粒度的key使得task大小合理化。
修改并行度。
调用rebalance操作,使数据分区均匀。
由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。示例可以参考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
以上是关于Flink性能调优总结的主要内容,如果未能解决你的问题,请参考以下文章