Spark案例库V1.0版

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark案例库V1.0版相关的知识,希望对你有一定的参考价值。

Spark案例库

案例一:使用SparkRDD实现词频统计

pom.xml文件

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>

<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>

<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

实现代码

object SparkWordCount {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		
		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\\\\s+"))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp + item)
		// TODO: 第三步、将最终处理结果打印控制台
		resultRDD.foreach(tuple => println(tuple))
		// 应用结束,关闭资源
		sc.stop()
	}
}

案例二:WordCount程序,按照词频降序排序取Top3

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>

<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>

<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

代码实现

object SparkTopKey {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\\\\s+"))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp + item)
		resultRDD
			.sortBy(tuple => tuple._2, ascending = false)
			// 打印结果
			.take(3)
			.foreach(tuple => println(tuple))
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计

pom.xml

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11.12</scala.version>
    <scala.binary.version>2.11</scala.binary.version>
    <spark.version>2.4.5</spark.version>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>

<dependencies>
    <!-- 依赖Scala语言 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- Spark Core 依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Hadoop Client 依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>

<build>
    <outputDirectory>target/classes</outputDirectory>
    <testOutputDirectory>target/test-classes</testOutputDirectory>
    <resources>
        <resource>
            <directory>${project.basedir}/src/main/resources</directory>
        </resource>
    </resources>
    <!-- Maven 编译的插件 -->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
以上是关于Spark案例库V1.0版的主要内容,如果未能解决你的问题,请参考以下文章

Kudu案例库V1.0版

如何从包含枚举的案例类创建 Spark 数据集或数据框

在这个 spark 代码片段中 ordering.by 是啥意思?

python+spark程序代码片段

Python代码项目目录规范v1.0

面向对象案例 - 学生信息管理系统V1.0