kryo的速度测试

Posted 猫二哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kryo的速度测试相关的知识,希望对你有一定的参考价值。

1背景介绍
这里主要想测试一些spark的优化方式之一的kryo。场景为通过数1000w的数据,通过日期分组,求一个点击字段的sum。使用了kryo和没使用kryo的时间对比。这里由于环境限制,主要是使用到了kryo在各个机器之间的传输序列化(这里是内网很快),传入内存序列化,磁盘数据RDD的序列化(这个案列没有用到)。

数据格式:
id,addtime,deviceNum,itemid,op_type,op_num,inserttime
这是对一个物品的点击次数,已经通过addtime,deviceNum,itemid分组求和了。
需求为计算每天的点击数:通过日期分组,求和。
“select addtime,sum(op_num) as num from day_epinfo group by addtime”
数据大小:
一个月数据,已经通过addtime,deviceNum,itemid分组求和之后,数据量866M,一共1000w多条数据
每个子节点已经的数据目录已经有数据了
数据来源,自己写个java随机生成呗。

spark集群环境介绍

2没有使用kryo
开始时间:2017-04-08 18:24:35
结束时间:2017-04-08 18:25:06
耗时31秒,是比较快的,因为数据本地性,并且使用kryo的话,数据的传输也是在这两个节点上传输,对于这个场景速度提升有限,但是肯定会提升,这里讲原理。

3使用kryo

开始时间:2017-04-08 18:52:14
结束时间:2017-04-08 18:52:36
耗时22秒,可以看出kryo还是有效果的。

可以看出使用了kryo的效果会好一点,这里因为只是用到了kryo的在集群之间的传输序列化,而且本身2台机器在同一个vm中,同一个电脑中,所以速度本身就好快,效果只好一点。
如果在网络中使用kryo的话,效果会很明显。

4kryo的优缺点
序列化速度快,但是不能支持所有的Serializable,我所知道的是不能序列化和反序列化map类型,其他不知道。并且对应的数据的类需要注册,如果不注册会比较浪费内存。

ps:
在编译环境和运行环境的各种jar的版本要一直,不然可能会报如下错误

这里是spark的job监控界面:

测试代码如下:

package com.j.kryo

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.SparkSession

/**
  * Created by Administrator on 2017/3/21.
  * kryo的序列化案列
  */
object kryo 

  //日志格式
  case class Day_epinfo(id:Int,addtime:String,deviceNum:String,comicId:String,op_type:String,op_num:Int,insertTime:String)
  case class Persion(name:String,age:String)
  def main(args:Array[String]):Unit=

    println("开始时间:"+getNowDate())
     val spark = SparkSession.builder()
        .appName("kryo")
     .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()
      spark.sparkContext.setLogLevel("ERROR")

    spark.sparkContext.getConf.registerKryoClasses(Array(classOf[Day_epinfo]))

    /*System.setProperty("hadoop.home.dir","E:\\\\bigdata\\\\hadoop-2.6.4")
    val warehouseLocation ="E:\\\\mllibworksparce2\\\\mllibworksparce2\\\\spark-warehouse"
    val spark = SparkSession.builder()
      .appName("kryo")
        .master("local[2]")
      .config("spark.sql.warehouse.dir",warehouseLocation)
     .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
*/
   val data_souce_path=args(0)
    val data_result_path=args(1)
   /*val data_souce_path="data/epinfo_log_201702_tmp"
    val data_result_path="data/result"*/

    val data = spark.read.textFile(data_souce_path)

    import spark.implicits._

  val data2 = data.map(line=>line.split("\\t"))
      .map(attributes=>Day_epinfo(attributes(0).toInt,attributes(1),attributes(2),attributes(3),attributes(4),attributes(5).toInt,attributes(6)))


    data2.createOrReplaceTempView("day_epinfo")
    val count = spark.sql("select count(1) from day_epinfo")
   val result =  spark.sql("select addtime,sum(op_num) as num from day_epinfo group by addtime")
    count.show()
    result.show()
    println("结束时间:"+getNowDate())
    /*result.write.save(data_result_path)

    spark.read.load(data_result_path).toDF().show()*/
    spark.close()
  
  def getNowDate():String=
    var now:Date = new Date()
    var  dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    var hehe = dateFormat.format( now )
    hehe
  

maven的pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.j</groupId>
  <artifactId>j-mllib</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>

    <!--<dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>commons-logging</groupId>
      <artifactId>commons-logging</artifactId>
      <version>1.1.1</version>
      <type>jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.3.2</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.6.4</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.8</version>
      </dependency>
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-compiler</artifactId>
          <version>2.11.8</version>
      </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.1.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.1.0</version>
      </dependency>
    <dependency>
      <groupId>com.esotericsoftware</groupId>
      <artifactId>kryo</artifactId>
      <version>4.0.0</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>$scala.version</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>$scala.version</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

以上是关于kryo的速度测试的主要内容,如果未能解决你的问题,请参考以下文章

Kryo 使用指南

Spark-Kryo序列化框架

kryo

实战Redis序列化性能测试(Kryo和字符串)

Spark 调优之RDD持久化级别及kryo序列化性能测试

Kryo序列化:Class Not Found的可能原因