Spark 编程基础

Posted 见贤思小齐,知足常乐呵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 编程基础相关的知识,希望对你有一定的参考价值。

1. 初始化Spark

import org.apache.spark.{SparkContext, SparkConf}

val conf=new SparkConf().setAppName("RDD1").setMaster("local")
val sc=new SparkContext(conf)

2. 创建RDD的方法

内存:Parallelize 或者 makeRDD

外部文件:textFile

//1.  both Parallelize and makeRDD could create RDD from In-Memory
 val distData=sc.parallelize(data)                   // parallelize
 val distData1=sc.makeRDD(data)                 // makeRDD 

//2 textFile could create RDD from files
val distFile=sc.textFile("E:/Java_WS/ScalaDemo/data/wc.txt") 

3. 键值对

下面两者等价:

myRDD. map (s=> (s,1))
myRDD. map (_,1)

reduceByKey 和sortByKey、groupByKey

distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)   
distFile.flatMap(_.split(" ")).map(s=>(s,1)).sortByKey().collect().foreach(println)
distFile.flatMap(_.split(" ")).map(s=>(s,1)).groupByKey().foreach(println)

1)返回key 以及 每个key的个数 (key, cnt)

2)返回 (key,value) 排序后的

3)返回(key, (value1,value2...))

4. RDD 持久化  

 persist() 或 cache()

 unpersist() 可以删除缓存RDD

5. 广播变量和累加器

  • 通过sc.broadcast(v) 和 sc.accumulator(初始值,comments)定义
  • 通过value访问其值。
  • 广播变量不能修改了
  • 累加器只能通过add 或者 +=修改
//SparkContext.broadcast(v)  is a broadcast variable, could replace v in any place of the cluster
val broadcastVar=sc.broadcast(Array(1,2,3))
println(broadcastVar.value(0),broadcastVar.value(1),broadcastVar.value(2))
    
val accum=sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
println(accum.value)

  

以上是关于Spark 编程基础的主要内容,如果未能解决你的问题,请参考以下文章

独家 | PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)

python+spark程序代码片段

spark 深入学习 05RDD编程之旅基础篇02-Spaek shell

IEEAC云专委前沿说PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)

Spark编程实战-词频统计

Spark编程实战-词频统计