Spark学习总结-Spark-Core
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark学习总结-Spark-Core相关的知识,希望对你有一定的参考价值。
Spark-Core
一 Driver和Executor通信
Driver相当于Client,Executor相当于Server
- Driver代码
package com.zxy.Socket
import java.io.OutputStream
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client: Socket = new Socket("localhost",9999)
//发送数据
val out: OutputStream = client.getOutputStream
out.write(2)
out.flush()
out.close()
client.close()
}
}
- Executor代码
package com.zxy.Socket
import java.io.InputStream
import java.net.{ServerSocket, Socket}
object Executor {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(9999)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val i: Int = in.read()
println(s"接收到客户端数据 + ${i}")
client.close()
server.close()
}
}
先启动服务端Executor,等待数据
启动客户端Driver,建立连接发送数据
二 案例引入Spark三大数据结构
1 案例
修改以上案例,使用两个服务端Executor接收数据
将Task中的数据分开计算
package com.zxy.Socket
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor1 {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(8888)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in)
val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.computer()
println(s"计算[8888]后的结果是: ${ints}")
TaskOBJ2.close()
client.close()
server.close()
}
}
package com.zxy.Socket
import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}
object Executor2 {
def main(args: Array[String]): Unit = {
//启动服务器,接受数据
val server: ServerSocket = new ServerSocket(9999)
println("服务器启动,等待数据")
//等待客户端连接接收数据
val client: Socket = server.accept()
val in: InputStream = client.getInputStream
val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in)
val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask]
val ints: List[Int] = task.computer()
println(s"计算[9999]后的结果是: ${ints}")
TaskOBJ1.close()
client.close()
server.close()
}
}
package com.zxy.Socket
import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket
object Driver {
def main(args: Array[String]): Unit = {
//连接服务器
val client1: Socket = new Socket("localhost",8888)
val client2: Socket = new Socket("localhost",9999)
val task: Task = new Task()
//server1发送数据
val out1: OutputStream = client1.getOutputStream
val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1)
val subTask1 = new SubTask()
subTask1.logic = task.logic
subTask1.datas = task.datas.take(2)
TaskOBJ1.writeObject(subTask1)
TaskOBJ1.flush()
TaskOBJ1.close()
client1.close()
//server2发送数据
val out2: OutputStream = client2.getOutputStream
val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2)
val subTask2 = new SubTask()
subTask2.logic = task.logic
subTask2.datas = task.datas.takeRight(2)
TaskOBJ2.writeObject(subTask2)
TaskOBJ2.flush()
TaskOBJ2.close()
client2.close()
println("数据发送完毕")
}
}
package com.zxy.Socket
class Task extends Serializable {
val datas = List(1,2,3,4)
val logic:Int => Int = _ * 2
}
package com.zxy.Socket
class SubTask extends Serializable {
//初始值
var datas:List[Int] = _
var logic:Int => Int = _
//计算
def computer()={
datas.map(logic)
}
}
先启动Executor1,Executor2;
再启动Driver
Executor1:
服务器启动,等待数据
计算[8888]后的结果是: List(2, 4)
Executor2:
服务器启动,等待数据
计算[9999]后的结果是: List(6, 8)
Driver:
数据发送完毕
2 Spark三大数据结构
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,
用于处理不同的应用场景。三大数据结构分别是:
> RDD: 弹性分布式数据集
> 累加器:分布式共享只写变量
> 广播变量:分布式共享只读变量
以上是关于Spark学习总结-Spark-Core的主要内容,如果未能解决你的问题,请参考以下文章
sbt.librarymanagement.ResolveException:下载 org.apache.spark 时出错:spark-core:3.0.1