Spark操作Kudu

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark操作Kudu相关的知识,希望对你有一定的参考价值。

使用Java API操作Kudu

概述

大数据常用Spark,Spark又是用Scala写的,所以。。。只会Java操作并不够,还得会用Scala语言操作。。。当然也不是给纯小白看的,直接show code。。。

Maven依赖

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <!-- 版本属性 -->
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.0-cdh6.2.1</spark.version>
        <hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
        <kudu.version>1.9.0-cdh6.2.1</kudu.version>
    </properties>

    <!-- 依赖JAR包 -->
    <dependencies>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- Kudu Client 依赖包 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- Junit 依赖包 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

操作Kudu表

package com.aa.kudu.table

import java.util

import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.client.CreateTableOptions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

//Kudu与Spark集成,使用KuduContext创建表和删除表
object KuduSparkTableDemo {

  def createKuduTable(tableName:String,kuduContext:KuduContext):Unit = {
    //表的Schema信息
    val schema:StructType = StructType(
      Array(
        StructField("id",IntegerType,nullable = false),
        StructField("name",StringType,nullable = true),
        StructField("age",IntegerType,nullable = true),
        StructField("gender",StringType,nullable = true)
      )
    )
    //表的主键
    val keys: Seq[String] = Seq("id")
    //表的选项设置
    val options:CreateTableOptions = new CreateTableOptions()
    options.setNumReplicas(1)
    options.addHashPartitions(util.Arrays.asList("id"),3)

    val kuduTable = kuduContext.createTable(tableName,schema,keys,options)
    println("Kudu Table ID: " + kuduTable)
  }

  def dropKuduTable(tableName:String,kuduContext: KuduContext): Unit ={
    //判断表是否存在,如果存在就删除表
    if(kuduContext.tableExists(tableName)){
      kuduContext.deleteTable(tableName)
    }
  }

  def main(args: Array[String]): Unit = {
    //构建SparkSession实例对象
    val spark:SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions","2")
      .getOrCreate()
    import spark.implicits._

    // TODO: 创建KuduContext对象
    val kuduContext:KuduContext = new KuduContext("192.168.88.20",spark.sparkContext)
    println(s"KuduContext: ${kuduContext}")

    //创建表
    //createKuduTable("kudu_aa_users",kuduContext)

    //删除表
    //dropKuduTable("kudu_aa_users",kuduContext)

    //应用结束关闭资源
    spark.stop()
  }

}

操作Kudu数据

package com.aa.kudu.data

import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

//对Kudu表的数据,进行CRUD操作
object KuduSparkDataDemo {

  //  插入数据
  def insertData(spark:SparkSession,kuduContext: KuduContext,tableName:String) ={
    //模拟产生数据...todo 调用toDF,指定列名称,RDD/Seq(元组)→DataFrame
    val userDF:DataFrame = spark.createDataFrame(
      Seq(
        (1001, "张三", 23, "男"),
        (1002, "李四", 22, "男"),
        (1003, "王五", 24, "女"),
        (1004, "马六", 33, "男")
      )
    ).toDF("id","name","age","gender")

    //将数据保存到Kudu表
    kuduContext.insertRows(userDF,tableName)
  }

  //  查询数据
  def selectData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
    //指定获取的列名称
    val columnProjection:Seq[String] = Seq("id","name","age")
    val datasRDD:RDD[Row] = kuduContext.kuduRDD(
      spark.sparkContext,tableName,columnProjection
    )

    //数据打印
    datasRDD.foreachPartition{iter =>
      iter.foreach{row =>
        println(s"p-${TaskContext.getPartitionId()}: id = ${row.getInt(0)}")+
        s", name = ${row.getString(1)},age = ${row.getInt(2)}"
      }
    }
  }

  //  更新数据
  def updateData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
    val usersDF:DataFrame = spark.createDataFrame(
      Seq(
        (1001, "zhangsan22", 24, "男"),
        (1002, "lisi22", 23, "男"),
        (1003, "xiaohong11", 24, "女"),
        (1004, "zhaoliu244", 33, "男")
      )
    ).toDF("id","name","age","gender")

    //数据存在则update,不存在则报错
    kuduContext.updateRows(usersDF,tableName)
  }

  //  插入更新数据
  def upsertData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
    //模拟产生数据
    val usersDF:DataFrame = spark.createDataFrame(
      Seq(
        (1001, "zhangsa风", 24, "男"),
        (1006, "tianqi", 33, "男")
      )
    ).toDF("id","name","age","gender")

    //数据存在则update,不存在则报错
    kuduContext.upsertRows(usersDF,tableName)
  }

  //  更新数据
  def deleteData(spark:SparkSession, kuduContext:KuduContext, tableName:String) = {
    //模拟生成数据
    import spark.implicits._
    val usersDF:DataFrame = spark.sparkContext.parallelize(List(1006)).toDF("id")

    //根据ID删除数据
    kuduContext.deleteRows(usersDF,tableName)
  }

  def main(args: Array[String]): Unit = {
    //构建SparkSession实例对象
    val spark:SparkSession =SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql/shuffle/partitions",2)
      .getOrCreate()
    import spark.implicits._

    // TODO: 创建KuduContext对象
    val kuduContext:KuduContext = new KuduContext("192.168.88.20",spark.sparkContext)
    println(s"kuduContext: ${kuduContext}")

    val tableName = "kudu_aa_users"

    //插入数据
    insertData(spark,kuduContext,tableName)

    // 查询数据
    //selectData(spark, kuduContext, tableName)

    // 更新数据
    //updateData(spark, kuduContext, tableName)

    // 插入更新数据
    //upsertData(spark, kuduContext, tableName)

    // 删除数据
    //deleteData(spark, kuduContext, tableName)

    //应用结束,关闭资源
    spark.stop()
  }
}

用SparkSQL操作Kudu

package com.aa.kudu.sql

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object KuduSparkSQLDemo {
  def main(args: Array[String]): Unit = {
    //构建SparkSession实例对象
    val spark:SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions",2)
      .getOrCreate()
    import spark.implicits._

    // TODO: 从Kudu表加载数据
    val kuduDF:DataFrame = spark.read
      .format("kudu")
      .option("kudu.table","kudu_aa_users")
      .option("kudu.master","192.168.88.20:7051")
      .load()
    kuduDF.printSchema()
    kuduDF.show(1,truncate = false)

    // TODO: 自定义UDF函数
    val gender_to_udf:UserDefinedFunction = udf(
      (gender:String) =>{
       gender match{
         case "男" => "M"
         case "女" => "F"
         case _ => "?"
       }
      }
    )

    // TODO: 调用UDF函数
    val etlDF:DataFrame = kuduDF.select(
      $"id",$"name",
      $"age".plus(1).as("age"),
      gender_to_udf($"gender").as("gender")
    )
    etlDF.printSchema()
    etlDF.show(10,truncate = false)

    // TODO: 保存到Kudu表
    etlDF.write
      .mode(SaveMode.Append)
      .format("kudu")
      .option("kudu.table"<

以上是关于Spark操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章

Spark操作Kudu

大数据Kudu:Spark操作Kudu

Kudu:Spark SQL操作Kudu

spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource

spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource

客快物流大数据项目(四十四):Spark操作Kudu创建表