进行SparkRDD各类操作----SparkMllib操作基础

Posted 汪本成

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进行SparkRDD各类操作----SparkMllib操作基础相关的知识,希望对你有一定的参考价值。

就问大家什么最重要,那就是基础,这次分享的是平常我们易忽略的基础知识,RDD的各种灵活变换和操作,为了方便大家理解,我这里就写的很简单,但是大家可以从中领会他的灵活之处,就算我们以后做流式计算还是机器学习都会和肯定大部分需要的是他,这里大概我就直接代码实践给大家看,我们毕竟是追究程序的

package rdd

import org.apache.log4j.Level, Logger
import org.apache.spark.SparkContext, SparkConf

/**
  *
  * Created by 汪本成 on 2016/8/1.
  */
object RDDMethod 

  //屏蔽不必要的日志显示在终端上
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  //程序入口
  val conf = new SparkConf().setAppName("testRDDMethod").setMaster("local")
  val sc = new SparkContext(conf)

  val array_Int = Array(1, 2, 3, 4, 5, 6, 7)
  val array_String = Array("abc", "b", "c", "d", "e", "f", "abc")
  val array_Cartesian1 = Array(2, 3, 4, 5, 6, 7, 8)
  val array_Cartesian2 = Array(8, 7, 6, 5, 4, 3, 2)
  val array_Count = Array((1, "cool"), (2, "good"), (1, "bad"), (1, "fine"))
  val array_SetKey = Array("one", "two", "three", "four", "five")
  val array_String2 = Array("g", "h", "i", "j", "k", "l", "m")

  var rdd = new RDDMethod()

  def main(args: Array[String]) 

    aggregate_Max1(array_Int)
    println("*******************************************************************")
    aggregate_Max2(array_Int)
    println("*******************************************************************")
    aggragate_String(array_String)
    println("*******************************************************************")
    use_Cache(array_String)
    println("*******************************************************************")
    use_Foreach(array_String)
    println("*******************************************************************")
    use_Cartesian(array_Cartesian1, array_Cartesian2)
    println("*******************************************************************")
    use_Coalesce(array_Int)
    println("*******************************************************************")
    use_Repartition(array_Int)
    println("*******************************************************************")
    use_CountByValue(array_Int)
    println("*******************************************************************")
    use_CountByKey(array_Count)
    println("*******************************************************************")
    use_Distinct(array_String)
    println("*******************************************************************")
    use_Filter(array_Int)
    println("*******************************************************************")
    use_FlatMap(array_Int)
    println("*******************************************************************")
    use_Map(array_Int)
    println("*******************************************************************")
    use_groupBy(array_Int)
    println("*******************************************************************")
    use_KeyBy(array_SetKey)
    println("*******************************************************************")
    use_reduce(array_String)
    println("*******************************************************************")
    use_SortBy(array_Count)
    println("*******************************************************************")
    use_zip(array_Int, array_String, array_String2)

  

  //测试aggregate方法
  //aggregate操作未分区的IntArray
  def aggregate_Max1(array: Array[Int]) = 
    val arr = sc.parallelize(array)
    val result = arr.aggregate(0)(math.max(_, _), _ + _)
    println(result)
  

  //aggregate操作分区为2IntArray
  def aggregate_Max2(array: Array[Int]) = 
    val arr = sc.parallelize(array, 2)
    val result = arr.aggregate(0)(math.max(_, _), _ + _)
    println(result)
  

  //aggregate操作StringArray
  def aggragate_String(array: Array[String]) = 
    val result = sc.parallelize(array).aggregate("")((value, word) => value + word, _ + _)
    println(result)
  

  //cache用法
  def use_Cache(array: Array[String]) = 
    val arr = sc.parallelize(array)
    println(arr)         //打印一个RDD的存储格式
    println("-------------------------------------------------------------")
    println(arr.cache())  //打印数据结果
  

  //foreach(println)演示
  def use_Foreach(array: Array[String]) = 
    //foreach(println)是一个专门用来打印未进行的Action操作的数据的专用方法,可以对数据进行提早计算
    sc.parallelize(array).foreach(println)
  

  //cartesian方法
  //此方法用于对不同的数组进行笛卡尔操作,要求数据集的长度必须相同,结果作为一个新的数据集返回
  def use_Cartesian(array1: Array[Int], array2: Array[Int]) = 
    //创建两个数组并进行笛卡尔计算,打印计算结果
    sc.parallelize(array1).cartesian(sc.parallelize(array2)).foreach(println)
  

  //coalesce方法
  def use_Coalesce(array: Array[Int]) = 
    val arr1 = sc.parallelize(array)
    val arr2 = arr1.coalesce(2, true)
    //计算数据值
    val result = arr1.aggregate(0)(math.max(_, _), _ + _)
    //计算重新分区的数据值
    val result2 = arr2.aggregate(0)(math.max(_, _), _ + _)
    //打印结果
    println(result)
    println(result2)
  

  //repartition方法
  def use_Repartition(array: Array[Int]) = 
    //分区数
    val arr_num = sc.parallelize(array).repartition(3).partitions.length
    println(arr_num)
  

  //countByValue方法
  //此方法是计算数据集某个数据出现的次数,并将其以map的形式返回
  def use_CountByValue(array: Array[Int]) = 
    sc.parallelize(array).countByValue().foreach(println)
  

  //countByKey方法
  //此方法是计算数组中元数据键值对key出现的个数
  def use_CountByKey(array: Array[(Int, String)]) = 
    sc.parallelize(array).countByKey().foreach(println)
  

  //distinct方法
  //除去数据集中重复项的方法
  def use_Distinct(array: Array[String]) = 
    sc.parallelize(array).distinct().foreach(println)
  

  //filter方法
  //此方法是对数据集进行过滤
  def use_Filter(array: Array[Int]) = 
    sc.parallelize(array).filter(_ >= 3).foreach(println)
  

  //flatMap方法
  //以行为单位进行数据操作,其在定义时就是针对数据集的数据作为一个整体进行操作,最终返回也是一个数据集
  def use_FlatMap(array: Array[Int]) = 
    sc.parallelize(array).flatMap(x => List(x + 1)).collect().foreach(println)
  

  //map方法
  //此方法可以对RDD数据集中的数据进行逐个操作
  //map直接对数据集中的数据做单独的处理
  def use_Map(array: Array[Int]) = 
    sc.parallelize(array).map(x => List(x + 1)).collect().foreach(println)
  

  //groupBy方法
  //groupBy方法第一个参数是传入方法名,第二个参数是分组的标签值
  def use_groupBy(array: Array[Int]) = 
    val arr = sc.parallelize(array)
    arr.groupBy(rdd.myFilter(_), 1).foreach(println)
    println("-----------------------------------------------------------------")
    arr.groupBy(rdd.myFilter2(_), 2).foreach(println)
  

  //KeyBy方法
  //此方法是为数据集中的每个个体数据增加一个key,从而可以与原来的个体数据形成键值对
  def use_KeyBy(array: Array[String]) = 
    //设置key配置方法,并打印结果
    sc.parallelize(array).keyBy(word => word.size).foreach(println)
  


  //reduce方法
  def use_reduce(array: Array[String]) = 
    val arr = sc.parallelize(array)
    //字符串的拼接
    arr.reduce(_ + _).foreach(println)
    //打印最长字符串
    arr.reduce(funLength).foreach(println)
  

  /**
    * 寻找最长字符串
    *
    * @param str1
    * @param str2
    * @return
    */
  def funLength(str1: String, str2: String): String = 
    var str = str1
    if (str2.size >= str.size) 
      str = str2
    
    return str
  

  //sortBy方法
  def use_SortBy(array: Array[(Int, String)]) = 
    var str = sc.parallelize(array)
    //按照第一个数据进行排序
    str = str.sortBy(word => word._1, true)
    //按照第二个数据进行排序
    val str2 = str.sortBy(word => word._2, true)
    str.foreach(println)
    str2.foreach(println)
  

  //zip方法
  //此方法将若干个RDD压缩成一个新的RDD,进而形成一系列的键值对存储形式的RDD
  def use_zip(array1: Array[Int], array2: Array[String], array3: Array[String]) = 
    array1.zip(array2).zip(array3).foreach(println)
  


class RDDMethod

  /**
    * 大于3的数
    *
    * @param num
    */
  def myFilter(num: Int): Unit = 
    num >= 3
  

  /**
    * 小于3的数
    *
    * @param num
    */
  def myFilter2(num: Int): Unit =
    num < 3
  


写的很垃圾,大家莫笑,程序结果事例运行如下

"C:\\Program Files\\Java\\jdk1.8.0_77\\bin\\java" -Didea.launcher.port=7533 "-Didea.launcher.bin.path=D:\\Program Files (x86)\\JetBrains\\IntelliJ IDEA 15.0.5\\bin" -Dfile.encoding=UTF-8 -classpath "C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\charsets.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\deploy.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\access-bridge-64.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\cldrdata.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\dnsns.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\jaccess.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\jfxrt.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\localedata.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\nashorn.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\sunec.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\sunjce_provider.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\sunmscapi.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\sunpkcs11.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\ext\\zipfs.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\javaws.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\jce.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\jfr.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\jfxswt.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\jsse.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\management-agent.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\plugin.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\resources.jar;C:\\Program Files\\Java\\jdk1.8.0_77\\jre\\lib\\rt.jar;G:\\location\\spark-mllib\\out\\production\\spark-mllib;C:\\Program Files (x86)\\scala\\lib\\scala-actors-migration.jar;C:\\Program Files (x86)\\scala\\lib\\scala-actors.jar;C:\\Program Files (x86)\\scala\\lib\\scala-library.jar;C:\\Program Files (x86)\\scala\\lib\\scala-reflect.jar;C:\\Program Files (x86)\\scala\\lib\\scala-swing.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\datanucleus-api-jdo-3.2.6.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\datanucleus-core-3.2.10.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\datanucleus-rdbms-3.2.9.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\spark-1.6.1-yarn-shuffle.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\spark-assembly-1.6.1-hadoop2.6.0.jar;G:\\home\\download\\spark-1.6.1-bin-hadoop2.6\\lib\\spark-examples-1.6.1-hadoop2.6.0.jar;D:\\Program Files (x86)\\JetBrains\\IntelliJ IDEA 15.0.5\\lib\\idea_rt.jar" com.intellij.rt.execution.application.AppMain rdd.RDDMethod
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/G:/home/download/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/G:/home/download/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/08/02 19:29:01 INFO Slf4jLogger: Slf4jLogger started
16/08/02 19:29:01 INFO Remoting: Starting remoting
16/08/02 19:29:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.101:47358]

7
*******************************************************************
10
*******************************************************************
abcbcdefabc
*******************************************************************
ParallelCollectionRDD[3] at parallelize at RDDMethod.scala:94
-------------------------------------------------------------
ParallelCollectionRDD[3] at parallelize at RDDMethod.scala:94
*******************************************************************
abc
b
c
d
e
f
abc
*******************************************************************
(2,8)
(2,7)
(2,6)
(2,5)
(2,4)
(2,3)
(2,2)
(3,8)
(3,7)
(3,6)
(3,5)
(3,4)
(3,3)
(3,2)
(4,8)
(4,7)
(4,6)
(4,5)
(4,4)
(4,3)
(4,2)
(5,8)
(5,7)
(5,6)
(5,5)
(5,4)
(5,3)
(5,2)
(6,8)
(6,7)
(6,6)
(6,5)
(6,4)
(6,3)
(6,2)
(7,8)
(7,7)
(7,6)
(7,5)
(7,4)
(7,3)
(7,2)
(8,8)
(8,7)
(8,6)
(8,5)
(8,4)
(8,3)
(8,2)
*******************************************************************
7
13
*******************************************************************
3
*******************************************************************
(5,1)
(1,1)
(6,1)
(2,1)
(7,1)
(3,1)
(4,1)
*******************************************************************
(1,3)
(2,1)
*******************************************************************
d
e
b
f
abc
c
*******************************************************************
3
4
5
6
7
*******************************************************************
2
3
4
5
6
7
8
*******************************************************************
List(2)
List(3)
List(4)
List(5)
List(6)
List(7)
List(8)
*******************************************************************
((),CompactBuffer(1, 2, 3, 4, 5, 6, 7))
-----------------------------------------------------------------
((),CompactBuffer(1, 2, 3, 4, 5, 6, 7))
*******************************************************************
(3,one)
(3,two)
(5,three)
(4,four)
(4,five)
*******************************************************************
a
b
c
b
c
d
e
f
a
b
c
a
b
c
*******************************************************************
(1,cool)
(1,bad)
(1,fine)
(2,good)
(1,bad)
(1,cool)
(1,fine)
(2,good)
*******************************************************************
((1,abc),g)
((2,b),h)
((3,c),i)
((4,d),j)
((5,e),k)
((6,f),l)
((7,abc),m)
16/08/02 19:29:03 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

Process finished with exit code 0

以上是关于进行SparkRDD各类操作----SparkMllib操作基础的主要内容,如果未能解决你的问题,请参考以下文章

SparkRDD操作具体解释4——Action算子

sparkRDD操作

SparkRDD转换操作

SparkRDD转换操作

SparkRDD转换操作

SparkRDD操作具体解释2——值型Transformation算子