SparkSpark Core 基本原理
Posted 魏晓蕾
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSpark Core 基本原理相关的知识,希望对你有一定的参考价值。
Spark Core API 计算 PV
package org.example
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.eclipse.jetty.util.MultiMap, UrlEncoded
object ComputePV
def computePV(textRDD: RDD[String]): RDD[(String, Long)] =
val splitTextFileRDD = textRDD.map(_.split("\\t"))
val result = splitTextFileRDD
.filter(log => log(1).contains("product_id"))
.map(log =>
val paramsMap = new MultiMap[String]
UrlEncoded.decodeTo(log(1), paramsMap, "UTF-8")
val productId = paramsMap.getValue("product_id", 0)
(productId, 1L)
)
.reduceByKey(_ + _)
result
def main(args: Array[String]): Unit =
if(args.length < 2)
System.err.println("Usage:<input_file> <output_file>")
System.exit(1)
val conf = new SparkConf().setAppName("ComputePV")
val sc = new SparkContext(conf)
val textFileRDD = sc.textFile(args(0))
val result = computePV(textFileRDD)
result.map(r => r._1 + "\\t" + r._2).saveAsTextFile(args(1))
Spark Core API 计算 UV
package org.example
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.eclipse.jetty.util.MultiMap, UrlEncoded
object ComputeUV
def computeUV(textRDD: RDD[String]): RDD[(String, Long)] =
val splitTextFileRDD = textRDD.map(_.split("\\t"))
val result = splitTextFileRDD
.filter(log => log(1).contains("product_id") && log(3).contains("uid"))
.map(log =>
val paramsMap = new MultiMap[String]
UrlEncoded.decodeTo(log(1), paramsMap, "UTF-8")
val productId = paramsMap.getValue("product_id", 0)
val uid = log(3).split("=")(1)
(productId, uid)
)
.distinct()
.map(productIdWithUid => (productIdWithUid._1, 1L))
.reduceByKey(_ + _)
result
def main(args: Array[String]): Unit =
if(args.length < 2)
System.err.println("Usage: <input_file> <output_file>")
System.exit(1)
val conf = new SparkConf().setAppName("ComputeUV")
val sc = new SparkContext(conf)
val textFileRDD = sc.textFile(args(0))
val result = computeUV(textFileRDD)
result.map(r => r._1 + "\\t" + r._2).saveAsTextFile(args(1))
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>GuPaoStudySparkCoreDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>$project.artifactId</name>
<description>My wonderfull scala app</description>
<inceptionYear>2010</inceptionYear>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
</properties>
<repositories>
<repository>
<id>nexus-aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!--
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
-->
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib-local_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>$scala.version</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>9.4.33.v20201020</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>$project.build.directory/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.5.0</version>
<executions>
<execution>
<id>run-local</id>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>spark-submit</executable>
<arguments>
<argument>--master</argument>
<argument>local</argument>
<argument>$project.build.directory/$project.artifactId-$project.version-uber.jar</argument>
</arguments>
</configuration>
</execution>
<execution>
<id>run-yarn</id>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<environmentVariables>
<HADOOP_CONF_DIR>
$basedir/spark-remote/conf
</HADOOP_CONF_DIR>
</environmentVariables>
<executable>spark-submit</executable>
<arguments>
<argument>--master</argument>
<argument>yarn</argument>
<argument>$project.build.directory/$project.artifactId-$project.version-uber.jar</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
以上是关于SparkSpark Core 基本原理的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术Hadoop+SparkSpark架构原理优势生态系统等讲解(图文解释)