使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响
Posted 杀智勇双全杀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响相关的知识,希望对你有一定的参考价值。
概述
之前测试过单虚拟机(分配2Core和6G内存)跑Spark进行WordCount需要50s左右,相对应3虚拟机(分配2Core和6G内存,放2个SSD中提高IO性能)跑MapReduce(Hadoop2.7.5)进行WordCount需要3分钟左右,提速很明显。。。今天再试试Flink速度怎么样。。。
准备工作
Maven工程准备
idea中创建项目,导入依赖包:
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.10.0</flink.version>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Apache Flink 的依赖, 这些依赖项,不应该打包到JAR文件中. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink操作hdfs,所需要导入该包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- 添加logging框架, 在IDE中运行时生成控制台输出. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--
<mainClass>com.aa.FlinkWordCount</mainClass>
-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
测试数据准备
使用老早的wc.txt
,文件大小370M,数据量:
(hue,17280000)
(spark,17280000)
(hbase,11520000)
(hadoop,10848000)
(hive,5760000)
与之前一致。
Scala程序编写
package com.aa.start
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
/**
* 使用Scala语言调用Flink框架实现词频统计:WordCount
*/
object FlinkWordCount {
def main(args: Array[String]): Unit = {
// 执行环境-env
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
var core:Int = 7
env.setParallelism(core)
// 数据源-source
val inputDataSet: DataSet[String] = env.readTextFile("datas/wc.txt")// 文件大小370M
val startTime: Long = System.currentTimeMillis()
// 数据转换-transformation
val resultDataSet: DataSet[(String, Int)] = inputDataSet
// 先过滤
.filter(line => null != line && line.trim.length > 0)
// 切分为单词
.flatMap(line => line.trim.split("\\\\W+"))
// 转换为二元组
.map(word => word -> 1)
// 分组求和
.groupBy(0).sum(1)
// 降序排序
.sortPartition(1, Order.DESCENDING)
// 数据终端-sink
resultDataSet.printToErr() // 输出为红色
/*
(hue,17280000)
(spark,17280000)
(hbase,11520000)
(hadoop,10848000)
(hive,5760000)
*/
val endTime: Long = System.currentTimeMillis()
println(s"调用 ${core} Core,用时 : ${endTime}-${startTime} = ${endTime-startTime} ms")
// 触发执行-execute
// 批处理不需要触发也能执行
}
}
执行
分多次,修改Core值后得出结果:
调用 8 Core,用时 : 1626513986430-1626513966494 = 19936 ms
调用 7 Core,用时 : 1626511920251-1626511898580 = 21671 ms
调用 4 Core,用时 : 1626511982015-1626511957206 = 24809 ms
调用 2 Core,用时 : 1626512053248-1626512021101 = 32147 ms
调用 1 Core,用时 : 1626512124226-1626512072327 = 51899 ms
分析结果
可以看出,并行度1和2还是有本质区别的。随后提高并行度并不是线性关系。在本机环境运行,有后台程序吃CPU,读写IO也需要时间,很难让Flink吃完100%的CPU。不过大体上还是能够通过提高并行度的方式来提高多核CPU的运算速度。
可以把Parallelism
并行度设置为CPU核数以确保跑满CPU,带超线程
的CPU(例如笔者的老爷机是I7-6700HQ,只有4核8线程)需要设置为CPU核数的2倍。高于此数值时/还有其余后台程序吃CPU,如果CPU已经跑满100%,性能提升当然不明显。
以上是关于使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响的主要内容,如果未能解决你的问题,请参考以下文章