SparkSpark Core 基本原理

Posted 魏晓蕾

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSpark Core 基本原理相关的知识,希望对你有一定的参考价值。

Spark Core API 计算 PV

package org.example

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.eclipse.jetty.util.MultiMap, UrlEncoded

object ComputePV 
  def computePV(textRDD: RDD[String]): RDD[(String, Long)] = 
    val splitTextFileRDD = textRDD.map(_.split("\\t"))
    val result = splitTextFileRDD
      .filter(log => log(1).contains("product_id"))
      .map(log => 
        val paramsMap = new MultiMap[String]
        UrlEncoded.decodeTo(log(1), paramsMap, "UTF-8")
        val productId = paramsMap.getValue("product_id", 0)
        (productId, 1L)
      )
      .reduceByKey(_ + _)
    result
  

  def main(args: Array[String]): Unit = 
    if(args.length < 2) 
      System.err.println("Usage:<input_file> <output_file>")
      System.exit(1)
    
    val conf = new SparkConf().setAppName("ComputePV")
    val sc = new SparkContext(conf)
    val textFileRDD = sc.textFile(args(0))
    val result = computePV(textFileRDD)
    result.map(r => r._1 + "\\t" + r._2).saveAsTextFile(args(1))
  

Spark Core API 计算 UV

package org.example

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.eclipse.jetty.util.MultiMap, UrlEncoded

object ComputeUV 

  def computeUV(textRDD: RDD[String]): RDD[(String, Long)] = 
    val splitTextFileRDD = textRDD.map(_.split("\\t"))
    val result = splitTextFileRDD
      .filter(log => log(1).contains("product_id") && log(3).contains("uid"))
      .map(log => 
        val paramsMap = new MultiMap[String]
        UrlEncoded.decodeTo(log(1), paramsMap, "UTF-8")
        val productId = paramsMap.getValue("product_id", 0)
        val uid = log(3).split("=")(1)
        (productId, uid)
      )
      .distinct()
      .map(productIdWithUid => (productIdWithUid._1, 1L))
      .reduceByKey(_ + _)
    result
  

  def main(args: Array[String]): Unit = 
    if(args.length < 2) 
      System.err.println("Usage: <input_file> <output_file>")
      System.exit(1)
    
    val conf = new SparkConf().setAppName("ComputeUV")
    val sc = new SparkContext(conf)
    val textFileRDD = sc.textFile(args(0))
    val result = computeUV(textFileRDD)
    result.map(r => r._1 + "\\t" + r._2).saveAsTextFile(args(1))
  

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>GuPaoStudySparkCoreDemo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>$project.artifactId</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2010</inceptionYear>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>nexus-aliyun</id>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </repository>
  </repositories>
<!--
  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>
-->
  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-yarn_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib-local_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.5</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>$scala.version</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>9.4.33.v20201020</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_2.11</artifactId>
      <version>3.0.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive-thriftserver_2.11</artifactId>
      <version>2.4.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis</artifactId>
      <version>3.5.1</version>
    </dependency>
    <dependency>
      <groupId>com.redislabs</groupId>
      <artifactId>spark-redis</artifactId>
      <version>2.4.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.1.20</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-make:transitive</arg>
                <arg>-dependencyfile</arg>
                <arg>$project.build.directory/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.5.0</version>
        <executions>
          <execution>
            <id>run-local</id>
            <goals>
              <goal>exec</goal>
            </goals>
            <configuration>
              <executable>spark-submit</executable>
              <arguments>
                <argument>--master</argument>
                <argument>local</argument>
                <argument>$project.build.directory/$project.artifactId-$project.version-uber.jar</argument>
              </arguments>
            </configuration>
          </execution>
          <execution>
            <id>run-yarn</id>
            <goals>
              <goal>exec</goal>
            </goals>
            <configuration>
              <environmentVariables>
                <HADOOP_CONF_DIR>
                  $basedir/spark-remote/conf
                </HADOOP_CONF_DIR>
              </environmentVariables>
              <executable>spark-submit</executable>
              <arguments>
                <argument>--master</argument>
                <argument>yarn</argument>
                <argument>$project.build.directory/$project.artifactId-$project.version-uber.jar</argument>
              </arguments>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        以上是关于SparkSpark Core 基本原理的主要内容,如果未能解决你的问题,请参考以下文章

SparkSpark一些面试题

SparkSpark SQL 物化视图技术原理与实践

大数据技术Hadoop+SparkSpark架构原理优势生态系统等讲解(图文解释)

SparkSpark之Transformation和Action

SparkSpark运行时产生的临时目录的问题

SparkSpark任务提交方式和执行流程