使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响相关的知识,希望对你有一定的参考价值。

使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响

概述

之前测试过单虚拟机(分配2Core和6G内存)跑Spark进行WordCount需要50s左右,相对应3虚拟机(分配2Core和6G内存,放2个SSD中提高IO性能)跑MapReduce(Hadoop2.7.5)进行WordCount需要3分钟左右,提速很明显。。。今天再试试Flink速度怎么样。。。

准备工作

Maven工程准备

idea中创建项目,导入依赖包:

<repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.10.0</flink.version>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Apache Flink 的依赖, 这些依赖项,不应该打包到JAR文件中. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink操作hdfs,所需要导入该包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.74</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- 添加logging框架, 在IDE中运行时生成控制台输出. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <!--
                                    <mainClass>com.aa.FlinkWordCount</mainClass>
                                    -->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

测试数据准备

使用老早的wc.txt,文件大小370M,数据量:

(hue,17280000)
(spark,17280000)
(hbase,11520000)
(hadoop,10848000)
(hive,5760000)

与之前一致。

Scala程序编写

package com.aa.start

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

/**
 * 使用Scala语言调用Flink框架实现词频统计:WordCount
 */
object FlinkWordCount {

  def main(args: Array[String]): Unit = {
    // 执行环境-env
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    var core:Int = 7  
    env.setParallelism(core)

    // 数据源-source
    val inputDataSet: DataSet[String] = env.readTextFile("datas/wc.txt")// 文件大小370M

    val startTime: Long = System.currentTimeMillis()

    // 数据转换-transformation
		val resultDataSet: DataSet[(String, Int)] = inputDataSet
			// 先过滤
			.filter(line => null != line && line.trim.length > 0)
			// 切分为单词
			.flatMap(line => line.trim.split("\\\\W+"))
			// 转换为二元组
			.map(word => word -> 1)
			// 分组求和
			.groupBy(0).sum(1)
			// 降序排序
			.sortPartition(1, Order.DESCENDING)

    // 数据终端-sink
		resultDataSet.printToErr() // 输出为红色
    /*
    (hue,17280000)
    (spark,17280000)
    (hbase,11520000)
    (hadoop,10848000)
    (hive,5760000)
     */

    val endTime: Long = System.currentTimeMillis()
    println(s"调用 ${core} Core,用时 : ${endTime}-${startTime} = ${endTime-startTime} ms")

    // 触发执行-execute
    // 批处理不需要触发也能执行
  }

}

执行

分多次,修改Core值后得出结果:

调用 8 Core,用时 : 1626513986430-1626513966494 = 19936 ms
调用 7 Core,用时 : 1626511920251-1626511898580 = 21671 ms
调用 4 Core,用时 : 1626511982015-1626511957206 = 24809 ms
调用 2 Core,用时 : 1626512053248-1626512021101 = 32147 ms
调用 1 Core,用时 : 1626512124226-1626512072327 = 51899 ms

分析结果

可以看出,并行度1和2还是有本质区别的。随后提高并行度并不是线性关系。在本机环境运行,有后台程序吃CPU,读写IO也需要时间,很难让Flink吃完100%的CPU。不过大体上还是能够通过提高并行度的方式来提高多核CPU的运算速度。

可以把Parallelism并行度设置为CPU核数以确保跑满CPU,带超线程的CPU(例如笔者的老爷机是I7-6700HQ,只有4核8线程)需要设置为CPU核数的2倍。高于此数值时/还有其余后台程序吃CPU,如果CPU已经跑满100%,性能提升当然不明显。

以上是关于使用Scala语言调用Flink框架进行WordCount词频统计测试不同Parallelism并行度对运算速度的影响的主要内容,如果未能解决你的问题,请参考以下文章

入门大数据---Flink开发环境搭建

使用akka框架和scala语言编写简单的RPC通信案例

scala语言介绍篇

2021年大数据常用语言Scala:基础语法学习 方法调用方式

Flink学习 批流版本的wordcount JAVA版本

Flink学习 批流版本的wordcount JAVA版本