spark笔记
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark笔记相关的知识,希望对你有一定的参考价值。
spark笔记
比较
Hadoop生态圈:MapReduce Hive Storm Yarn Sqoop Flume HDFS
Spark它会代替Hadoop:不能,可能会替代MR
炒作 社区比较完善
hadoop3.0作者:Doug cutting(hadoop以后也要走内存)
Spark:它是一种基于内存的计算框架 机器学习
Spark core:是最核心的部分,包含了Spark最基本、最核心的功能和基础分布式算子。Spark core的基本功能有任务调度,内存管理、故障恢复以及和存储系统的交互。
Apache Spark? is a fast and general engine for large-scale data processing.
Hadoop: MapReduce Hive Storm(JStorm) HDFS Mahout(pig) Giraph(图计算系统)
Spark: Spark core Spark Sql(shark) Spark Streaming Tachyon(内存的存储文件系统) MLlib GraphX
Speed:
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
Ease of Use:
Write applications quickly in Java, Scala, Python, R.
Generality:
Combine SQL, streaming, and complex analytics.
Runs Everywhere:
配置
Spark的编译:
http://spark.apache.org/docs/1.3.0/building-spark.html(地址)
http://www.scala-lang.org/download/(scala下载地址)
下载源码 -> 解压 -> 到根目录,对源码进行编译
在编译之前:
-
替换maven本地仓库 repository
-
配置镜像文件
3.域名解析
nameserver 8.8.4.4(4.4.4.4)
nameserver 8.8.8.8
3.对make-ditribution脚本进行修改
SCALA_VERSION=2.10
SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
SPARK_HIVE=1
VERSION=$("$MVN" help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version [email protected] 2>/dev/null | grep -v "INFO" | tail -n 1)
SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive [email protected] 2>/dev/null | grep -v "INFO" | fgrep --count "<id>hive</id>"; # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# because we use "set -o pipefail"
echo -n)
4.就是对Spark的源码进行编译(在根目录下)
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.3 -Pyarn -Phive-0.13.1 -Phive-thriftserver
HDFS: 一个namenode 多个datanode
一个master 多个worker(一台机器,可以有多个worker)
每一个spark-shell(spark-submit)它其实就是一个应用程序(现在它是运行在我们的standalone集群上)
开启日志
修改 default 文件
查看报错日记
架构
spark有三种模式:
-
本地模式:(什么也不用配置)
本地模式和standalone 一般是运用在测试环境 -
Spark Standalone配置:
slaves:hadoop01.hpqh.com
JAVA_HOME=/home/softwares/jdk1.8.0_60
SCALA_HOME=/home/hadoop01/softwares/scala-2.11.4
HADOOP_CONF_DIR=/home/hadoop01/softwares/hadoop-2.5.0-cdh5.3.3/etc/hadoop
SPARK_MASTER_IP=hadoop01.hpqh.com
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=lg
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=1
启动各个进程:
sbin/start-master.sh
sbin/start-slaves.sh
http://hadoop01.hpqh.com:4040/jobs/
http://hadoop01.hpqh.com:8080/
如果想让你的应用程序运行在standalone集群模式上,需要配置spark-default.conf:
spark.master spark://hadoop01.hpqh.com:7077
在spark中,每一个应用程序都只有一个SparkContext,
一台机器中可以设置多个worker,一个worker中可以包含多个executors(执行者),一个executors可以包含多个task
Rdd:transformation action
rdd1 ->
val textfile = sc.textFile("hdfs://hadoop01.hpqh.com:8020/user/hadoop01/mapreduce/easy.txt")
val rdd1 = textfile.flatMap(word => word.split(" ")))
val rdd2 = rdd1.map(word => (word,1))
val rdd3 = rdd2.reduceByKey((a,b) => (a+b))
val result = sc.textFile("hdfs://hadoop01.hpqh.com:8020/user/hadoop01/mapreduce/easy.txt").flatMap(word => word.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a+b)).
saveasTextFile("hdfs://hadoop01.hpqh.com:8020/xxxx")
执行过程
- app
- job1
- task
- rdd1
- rdd2
- task
- job2
- job……
- job1
依赖
宽依赖
窄依赖
spark streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("hdfs://wei:9000/input")
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://wei:9000/output/spark_streaming")
ssc.start()
高级数据源
- Kafka
- Flume
- Kinesis
实例 : 与Flume集成
Flume Agent
-
agent
-
source
-
channel
-
sink
-
绑定
source 和channel
source 和channel
注意事项:
- 先启动spark streaming
附件pic
以上是关于spark笔记的主要内容,如果未能解决你的问题,请参考以下文章
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段