spark笔记

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark笔记相关的知识,希望对你有一定的参考价值。

spark笔记

比较

Hadoop生态圈:MapReduce Hive Storm Yarn Sqoop Flume HDFS

Spark它会代替Hadoop:不能,可能会替代MR
炒作 社区比较完善

hadoop3.0作者:Doug cutting(hadoop以后也要走内存)

Spark:它是一种基于内存的计算框架 机器学习
Spark core:是最核心的部分,包含了Spark最基本、最核心的功能和基础分布式算子。Spark core的基本功能有任务调度,内存管理、故障恢复以及和存储系统的交互。

Apache Spark? is a fast and general engine for large-scale data processing.

Hadoop: MapReduce Hive Storm(JStorm) HDFS Mahout(pig) Giraph(图计算系统)

Spark: Spark core Spark Sql(shark) Spark Streaming Tachyon(内存的存储文件系统) MLlib GraphX

Speed:
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.

Ease of Use:
Write applications quickly in Java, Scala, Python, R.

Generality:
Combine SQL, streaming, and complex analytics.

Runs Everywhere:

配置

Spark的编译:

http://spark.apache.org/docs/1.3.0/building-spark.html(地址)
http://www.scala-lang.org/download/(scala下载地址)

下载源码 -> 解压 -> 到根目录,对源码进行编译

在编译之前:

  1. 替换maven本地仓库 repository

  2. 配置镜像文件

3.域名解析
nameserver 8.8.4.4(4.4.4.4)
nameserver 8.8.8.8

3.对make-ditribution脚本进行修改

SCALA_VERSION=2.10
SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
SPARK_HIVE=1

VERSION=$("$MVN" help:evaluate -Dexpression=project.version 2>/dev/null | grep -v "INFO" | tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version [email protected] 2>/dev/null    | grep -v "INFO"    | tail -n 1)
SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive [email protected] 2>/dev/null    | grep -v "INFO"    | fgrep --count "<id>hive</id>";    # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
    # because we use "set -o pipefail"
    echo -n)

4.就是对Spark的源码进行编译(在根目录下)
./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.3 -Pyarn -Phive-0.13.1 -Phive-thriftserver

HDFS: 一个namenode 多个datanode

一个master 多个worker(一台机器,可以有多个worker)
每一个spark-shell(spark-submit)它其实就是一个应用程序(现在它是运行在我们的standalone集群上)

开启日志

修改 default 文件

查看报错日记
技术分享
技术分享

技术分享

1492845901272

架构

spark有三种模式:

  1. 本地模式:(什么也不用配置)
    本地模式和standalone 一般是运用在测试环境

  2. Spark Standalone配置:

slaves:hadoop01.hpqh.com

spark-env.sh:


JAVA_HOME=/home/softwares/jdk1.8.0_60
SCALA_HOME=/home/hadoop01/softwares/scala-2.11.4

HADOOP_CONF_DIR=/home/hadoop01/softwares/hadoop-2.5.0-cdh5.3.3/etc/hadoop

SPARK_MASTER_IP=hadoop01.hpqh.com
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=lg
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=1
启动各个进程:
	sbin/start-master.sh
	sbin/start-slaves.sh

http://hadoop01.hpqh.com:4040/jobs/
http://hadoop01.hpqh.com:8080/

如果想让你的应用程序运行在standalone集群模式上,需要配置spark-default.conf:
spark.master spark://hadoop01.hpqh.com:7077

在spark中,每一个应用程序都只有一个SparkContext,

一台机器中可以设置多个worker,一个worker中可以包含多个executors(执行者),一个executors可以包含多个task

Rdd:transformation action

rdd1 -> 

val textfile = sc.textFile("hdfs://hadoop01.hpqh.com:8020/user/hadoop01/mapreduce/easy.txt")
val rdd1 = textfile.flatMap(word => word.split(" ")))
val rdd2 = rdd1.map(word => (word,1))
val rdd3 = rdd2.reduceByKey((a,b) => (a+b))

val result = sc.textFile("hdfs://hadoop01.hpqh.com:8020/user/hadoop01/mapreduce/easy.txt").flatMap(word => word.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a+b)).
saveasTextFile("hdfs://hadoop01.hpqh.com:8020/xxxx")

执行过程

技术分享

1492840334611

  • app
    • job1
      • task
        • rdd1
        • rdd2
    • job2
    • job……

依赖

宽依赖

窄依赖

spark streaming

技术分享

1493172519459

技术分享

1493173053337

技术分享

1493173547329

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

技术分享

1493191180330

import org.apache.spark._
import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("hdfs://wei:9000/input")
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://wei:9000/output/spark_streaming")
ssc.start()

高级数据源

  • Kafka
  • Flume
  • Kinesis

实例 : 与Flume集成

技术分享

1493275395492

Flume Agent

  1. agent

  2. source

  3. channel

  4. sink

  5. 绑定
    source 和channel
    source 和channel
    技术分享

    技术分享

    1493276607955

注意事项:

  1. 先启动spark streaming

附件pic



























以上是关于spark笔记的主要内容,如果未能解决你的问题,请参考以下文章

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

学习笔记:python3,代码片段(2017)

Spark闭包与序列化

大数据高级开发工程师——Spark学习笔记

Spark基础学习笔记01:初步了解Spark

Spark学习笔记——Spark Streaming