Flink并行度

Posted 浪尖聊大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink并行度相关的知识,希望对你有一定的参考价值。

并行执行

本节介绍如何在Flink中配置程序的并行执行。FLink程序由多个任务(转换/操作符、数据源和sinks)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行性。


如果要使用保存点,还应该考虑设置最大并行性(或最大并行性)。当从保存点还原时,可以改变特定运算符或整个程序的并行性,并且该设置指定并行性的上限。这是必需的,因为FLINK内部将状态划分为key-groups,并且我们不能拥有+INF的key-group数,因为这将对性能有害。

Flink中人物的并行度可以从多个不同层面设置:

1, 操作算子层面

2, 执行环境层面‘

3, 客户端层面

4, 系统层面

5,设置slots

操作算子层

操作算子,数据源,数据接收器等这些并行度都可以通过调用他们的setParallelism()方法置。例如

val env = StreamExecutionEnvironment.getExecutionEnvironment


val text = [...]

val wordCounts = text

   .flatMap{ _.split(" ") map { (_, 1) } }

   .keyBy(0)

   .timeWindow(Time.seconds(5))

   .sum(1).setParallelism(5)

wordCounts.print()


env.execute("Word Count Example")

执行环

flink程序行需要境上下文。其要行的操作算子,数据源,数据sinks都是置了默的并行度。境的并行度可以通操作算子示指定并行度来覆盖掉。

境并行度可以通过调用setParallelism()来置。例如,操作算子,数据源,数据接收器,并行度都3,那么在面,置方式如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(3)


val text = [...]

val wordCounts = text

   .flatMap{ _.split(" ") map { (_, 1) } }

   .keyBy(0)

   .timeWindow(Time.seconds(5))

   .sum(1)

wordCounts.print()


env.execute("Word Count Example")

在提交job flink候,在客也可以flink的并行度。客端即可以是java工程,也可以是scala工程。Flink的Command-line Interface (CLI)就是这样一种客

在客flink可以通-p参数来置并行度。例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

java/scala端,并行度置方式如下

try {

   PackagedProgram program = new PackagedProgram(file, args)

   InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")

   Configuration config = new Configuration()


   Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())


   // set the parallelism to 10 here

   client.run(program, 10, true)


} catch {

   case e: Exception => e.printStackTrace

}

统层

统层面的并行度置,会针对所有的境生效,可以通过parallelism.default,属性在conf/flink-conf.yaml文件中设置。


置最大并行度

置最大并行度,实际调用的方法是setMaxParallelism(),其调用位置和setParallelism()一样。

默认的最大并行度是近似于operatorParallelism + (operatorParallelism / 2),下限是127,上线是32768.


得注意的是将最大的并行的设置为大的数可能会性能造成不利的影响,因一些状后端是必要保存内部数据构的,个数据构跟key-group数量相匹配(是可重定状的内部实现机制)。

配置taskmanagerslot

flink通过将项目分成tasks,来实现并行的执行项目,划分的tasks会被发到slot去处理。

集群中Flinktaskmanager提供slot。Slots数量最合适的是跟taskmanager的cores数量成正比。当然,taskmanager.numberOfTaskSlots的推荐就是cpu核心的数目。

当启一个任候,我可以其提供默的slot数目,其也即是flink工程的并行度,置方式在上面已详细


推荐阅读


以上是关于Flink并行度的主要内容,如果未能解决你的问题,请参考以下文章

Flink并行度可以有如下几种指定方式

Flink并行度

Flink并行度

Flink中并行度相关问题

Flink的并行度及Slot

1.21.Flink Slot和并行度(parallelism)Flink的并行度由什么决定的?Flink的task是什么?slot和parallelism