如何在CDH5上运行Spark应用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在CDH5上运行Spark应用相关的知识,希望对你有一定的参考价值。

创建 maven 工程
使用下面命令创建一个普通的 maven 工程:
bash
$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
将 sparkwordcount 目录重命名为simplesparkapp,然后,在 simplesparkapp 目录下添加 scala 源文件目录:
bash
$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount
修改 pom.xml 添加 scala 和 spark 依赖:
xml
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0-cdh5.3.0</version>
</dependency>
</dependencies>
添加编译 scala 的插件:
xml
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
添加 scala 编译插件需要的仓库:
xml
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
另外,添加 cdh hadoop 的仓库:
xml
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
<repository>
<id>cloudera-repos</id>
<name>Cloudera Repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
最后,完整的 pom.xml 文件见: https://github.com/javachen/simplesparkapp/blob/master/pom.xml 。
运行下面命令检查工程是否能够成功编译:
bash
mvn package
编写示例代码
以 WordCount 为例,该程序需要完成以下逻辑:
读一个输入文件
统计每个单词出现次数
过滤少于一定次数的单词
对剩下的单词统计每个字母出现次数
在 MapReduce 中,上面的逻辑需要两个 MapReduce 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。
编写 Scala 程序 如下:
scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount
def main(args: Array[String])
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
val threshold = args(1).toInt
// split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
// filter out words with less than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)
// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
System.out.println(charCounts.collect().mkString(", "))
charCounts.saveAsTextFile("world-count-result")


Spark 使用懒执行的策略,意味着只有当 动作 执行的时候, 转换 才会运行。上面例子中的 动作 操作是 collect 和 saveAsTextFile ,前者是将数据推送给客户端,后者是将数据保存到 HDFS。
作为对比, Java 版的程序 如下:
java
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class JavaWordCount
public static void main(String[] args)
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
final int threshold = Integer.parseInt(args[1]);
// split each document into words
JavaRDD tokenized = sc.textFile(args[0]).flatMap(
new FlatMapFunction()
public Iterable call(String s)
return Arrays.asList(s.split(" "));


);
// count the occurrence of each word
JavaPairRDD counts = tokenized.mapToPair(
new PairFunction()
public Tuple2 call(String s)
return new Tuple2(s, 1);


).reduceByKey(
new Function2()
public Integer call(Integer i1, Integer i2)
return i1 + i2;


);
另外, Python 版的程序 如下:
python
import sys
from pyspark import SparkContext
file="inputfile.txt"
count=2
if __name__ == "__main__":
sc = SparkContext(appName="PythonWordCount")
lines = sc.textFile(file, 1)
counts = lines.flatMap(lambda x: x.split(\' \')) \\
.map(lambda x: (x, 1)) \\
.reduceByKey(lambda a, b: a + b) \\
.filter(lambda (a, b) : b >= count) \\
.flatMap(lambda (a, b): list(a)) \\
.map(lambda x: (x, 1)) \\
.reduceByKey(lambda a, b: a + b)
print ",".join(str(t) for t in counts.collect())
sc.stop()
编译
运行下面命令生成 jar:
bash
$ mvn package
运行成功之后,会在 target 目录生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。
运行
因为项目依赖的 spark 版本是 1.2.0-cdh5.3.0 ,所以下面的命令只能在 CDH 5.3 集群上运行。
首先,将测试文件 inputfile.txt 上传到 HDFS 上;
bash
$ wget https://github.com/javachen/simplesparkapp/blob/master/data/inputfile.txt
$ hadoop fs -put inputfile.txt
其次,将 sparkwordcount-0.0.1-SNAPSHOT.jar 上传到集群中的一个节点;然后,使用 spark-submit 脚本运行 Scala 版的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
或者,运行 Java 版本的程序:
bash
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2
对于 Python 版的程序,运行脚本为:
bash
$ spark-submit --master local PythonWordCount.py
如果,你的集群部署的是 standalone 模式,则你可以替换 master 参数的值为 spark://<master host>:<master port> ,也可以以 Yarn 的模式运行。
最后的 Python 版的程序运行输出结果如下:
(u\'a\', 4),(u\'c\', 1),(u\'b\', 1),(u\'e\', 6),(u\'f\', 1),(u\'i\', 1),(u\'h\', 1),(u\'l\', 1),(u\'o\', 2),(u\'n\', 4),(u\'p\', 2),(u\'r\', 2),(u\'u\', 1),(u\'t\', 2),(u\'v\', 1)
参考技术A 几个基本概念: (1)job:包含多个task组成的并行计算,往往由action催生。 (2)stage:job的调度单位。 (3)task:被送到某个executor上的工作单元。 (4)taskSet:一组关联的,相互之间没有shuffle依赖关系的任务组成的任务集。 一个应用程序由一个driver program和多个job构成。一个job由多个stage组成。一个stage由多个没有shuffle关系的task组成。

CDH开启kerberos后在第三方机器上部署Spark程序问题解决

一、概述

 当CDH平台开启kerberos后,需要kdc服务验证通过和kerberos协议验证通过才可以。如果将spark程序部署在CDH 机器上,直接在生成keytab并使用principal登录kerberos即可。

如果当spark应用程序部署在第三方机器上时,还需要处理krb5.conf注册问题。

二、问题解决

 1、当spark程序部署在CDH所在的机器上时,启动spark程序命令如下

  1)进入kerberos

  kadmin.local

  2)、查看kerberos人员

  listprincs

  3)、生成keytab文件

  ktadd -k /home/demo/hive.keytab -norandkey hive@TEST.COM

4)、认证用户

kinit -kt /home/demo/hive.keytab hive@TEST.COM

5)、使用spark-submit提交

  spark-submit --class com.rdc.bdata.compute.ParquetCommonRunner --keytab /home/demo/keytab --principal hive/bdp5@TEST.COM --master local[2]  ... 其他参数设置。

2、当spark程序部署在CDH第三方机器上时

其他步骤与第一个场景一致,有以下两个不一致的地方

 1)、需要部署Spark环境

 下载spark客户端环境并部署

2)、需要下载hive的配置文件,并将文件拷贝到spark环境的conf目录

在CDH平台上下载hive客户端配置文件,并将hdfs-site.xml、hive-site.xml、core-site.xml、yarn-site.xml拷贝到spark的conf目录中

3)、将krb5.conf拷贝到%java_home%/lib/security下面

如果不拷贝会报如下异常:

java.lang.IllegalArgumentException: Can‘t get Kerberos realm,具体异常的原因为Caused by: KrbException: Cannot locate default realm

解决过程:

  1. 问题分析:

    Cannot locate default realm,顾名思义是没有设置default  realm,kerberos设置default  realm有两种方式

          (1)通过设置系统属性,代码如下

         System.setProperty("java.security.krb5.realm",""); 

         System.setProperty("java.security.krb5.kdc","");

          (2)读取kerberos配置文件配置,设置default  realm。指定kerberos配置文件逻辑大概如下

  1. getJavaFileName()方法:获取kerberos的配置文件地址:

该方式是通过读系统属性java.security.krb5.conf,如果为空的话则以%java_home%/lib/security/krb5.conf为配置文件

        2.getNativeFileName()方法,在不同操作系统去读逻辑有区别

         在Windows下是通过读取c:Windowskrb5.ini路径,如果该路径为空的话,则以c:winntkrb5.ini为主

         在linux下读取etc/krb5.conf

      设置kerberos配置文件代码如下:

package sun.security.krb5;
public class Config {
    private final String defaultRealm;
    private Config() throws KrbException {
        //....省略其他代码
        this.defaultRealm = getProperty("java.security.krb5.realm");
    if ((this.defaultKDC != null || this.defaultRealm == null) && (this.defaultRealm != null || this.defaultKDC == null)) {
        try {
            String var3 = this.getJavaFileName();
            List var2;
            if (var3 != null) {
                var2 = this.loadConfigFile(var3);
                this.stanzaTable = this.parseStanzaTable(var2);
                if (DEBUG) {
                    System.out.println("Loaded from Java config");
                }
            } else {
                boolean var4 = false;
                if (isMacosLionOrBetter()) {
                    try {
                        this.stanzaTable = SCDynamicStoreConfig.getConfig();
                        if (DEBUG) {
                            System.out.println("Loaded from SCDynamicStoreConfig");
                        }
 
                        var4 = true;
                    } catch (IOException var6) {
                    }
                }
 
                if (!var4) {
                    var3 = this.getNativeFileName();
                    var2 = this.loadConfigFile(var3);
                    this.stanzaTable = this.parseStanzaTable(var2);
                    if (DEBUG) {
                        System.out.println("Loaded from native config");
                    }
                }
            }
        } catch (IOException var7) {
        }
 
    } else {
        throw new KrbException("System property java.security.krb5.kdc and java.security.krb5.realm both must be set or neither must be set.");
    }
}
    private String getJavaFileName() {
        String var1 = getProperty("java.security.krb5.conf");
        if (var1 == null) {
            var1 = getProperty("java.home") + File.separator + "lib" + File.separator + "security" + File.separator + "krb5.conf";
            if (!this.fileExists(var1)) {
                var1 = null;
            }
        }
        
        if (DEBUG) {
            System.out.println("Java config name: " + var1);
        }
        
        return var1;
    }
    //其他代码省略
}
 

  因为,使用spark-submit之前就需要获取krb5.conf参数,因此需要使用getJavaFileName()方法:将krb5.conf拷贝到jdk所在的目录中。

 

 

  

以上是关于如何在CDH5上运行Spark应用的主要内容,如果未能解决你的问题,请参考以下文章

如何在CDH5上运行Spark应用

如何在CDH5上运行Spark应用

CDH5.12.0 如何升级到Spark2.0 版本

如何使用Spark SQL 的JDBC server

如何在 CDH 5.4.4 上从 Spark 查询 Hive

0644-5.16.1-如何在CDH5中使用Spark2.4 Thrift