Spark操作Kudu
Posted 杀智勇双全杀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark操作Kudu相关的知识,希望对你有一定的参考价值。
Spark操作Kudu
概述
大数据常用Spark,Spark又是用Scala写的,所以。。。只会Java操作并不够,还得会用Scala语言操作。。。当然也不是给纯小白看的,直接show code。。。
Maven依赖
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.0-cdh6.2.1</spark.version>
<hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
</properties>
<!-- 依赖JAR包 -->
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- Kudu Client 依赖包 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- Junit 依赖包 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu.version}</version>
</dependency>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
操作Kudu表
package com.aa.kudu.table
import java.util
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.client.CreateTableOptions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
//Kudu与Spark集成,使用KuduContext创建表和删除表
object KuduSparkTableDemo {
def createKuduTable(tableName:String,kuduContext:KuduContext):Unit = {
//表的Schema信息
val schema:StructType = StructType(
Array(
StructField("id",IntegerType,nullable = false),
StructField("name",StringType,nullable = true),
StructField("age",IntegerType,nullable = true),
StructField("gender",StringType,nullable = true)
)
)
//表的主键
val keys: Seq[String] = Seq("id")
//表的选项设置
val options:CreateTableOptions = new CreateTableOptions()
options.setNumReplicas(1)
options.addHashPartitions(util.Arrays.asList("id"),3)
val kuduTable = kuduContext.createTable(tableName,schema,keys,options)
println("Kudu Table ID: " + kuduTable)
}
def dropKuduTable(tableName:String,kuduContext: KuduContext): Unit ={
//判断表是否存在,如果存在就删除表
if(kuduContext.tableExists(tableName)){
kuduContext.deleteTable(tableName)
}
}
def main(args: Array[String]): Unit = {
//构建SparkSession实例对象
val spark:SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions","2")
.getOrCreate()
import spark.implicits._
// TODO: 创建KuduContext对象
val kuduContext:KuduContext = new KuduContext("192.168.88.20",spark.sparkContext)
println(s"KuduContext: ${kuduContext}")
//创建表
//createKuduTable("kudu_aa_users",kuduContext)
//删除表
//dropKuduTable("kudu_aa_users",kuduContext)
//应用结束关闭资源
spark.stop()
}
}
操作Kudu数据
package com.aa.kudu.data
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
//对Kudu表的数据,进行CRUD操作
object KuduSparkDataDemo {
// 插入数据
def insertData(spark:SparkSession,kuduContext: KuduContext,tableName:String) ={
//模拟产生数据...todo 调用toDF,指定列名称,RDD/Seq(元组)→DataFrame
val userDF:DataFrame = spark.createDataFrame(
Seq(
(1001, "张三", 23, "男"),
(1002, "李四", 22, "男"),
(1003, "王五", 24, "女"),
(1004, "马六", 33, "男")
)
).toDF("id","name","age","gender")
//将数据保存到Kudu表
kuduContext.insertRows(userDF,tableName)
}
// 查询数据
def selectData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
//指定获取的列名称
val columnProjection:Seq[String] = Seq("id","name","age")
val datasRDD:RDD[Row] = kuduContext.kuduRDD(
spark.sparkContext,tableName,columnProjection
)
//数据打印
datasRDD.foreachPartition{iter =>
iter.foreach{row =>
println(s"p-${TaskContext.getPartitionId()}: id = ${row.getInt(0)}")+
s", name = ${row.getString(1)},age = ${row.getInt(2)}"
}
}
}
// 更新数据
def updateData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
val usersDF:DataFrame = spark.createDataFrame(
Seq(
(1001, "zhangsan22", 24, "男"),
(1002, "lisi22", 23, "男"),
(1003, "xiaohong11", 24, "女"),
(1004, "zhaoliu244", 33, "男")
)
).toDF("id","name","age","gender")
//数据存在则update,不存在则报错
kuduContext.updateRows(usersDF,tableName)
}
// 插入更新数据
def upsertData(spark:SparkSession, kuduContext:KuduContext, tableName:String) ={
//模拟产生数据
val usersDF:DataFrame = spark.createDataFrame(
Seq(
(1001, "zhangsa风", 24, "男"),
(1006, "tianqi", 33, "男")
)
).toDF("id","name","age","gender")
//数据存在则update,不存在则报错
kuduContext.upsertRows(usersDF,tableName)
}
// 更新数据
def deleteData(spark:SparkSession, kuduContext:KuduContext, tableName:String) = {
//模拟生成数据
import spark.implicits._
val usersDF:DataFrame = spark.sparkContext.parallelize(List(1006)).toDF("id")
//根据ID删除数据
kuduContext.deleteRows(usersDF,tableName)
}
def main(args: Array[String]): Unit = {
//构建SparkSession实例对象
val spark:SparkSession =SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql/shuffle/partitions",2)
.getOrCreate()
import spark.implicits._
// TODO: 创建KuduContext对象
val kuduContext:KuduContext = new KuduContext("192.168.88.20",spark.sparkContext)
println(s"kuduContext: ${kuduContext}")
val tableName = "kudu_aa_users"
//插入数据
insertData(spark,kuduContext,tableName)
// 查询数据
//selectData(spark, kuduContext, tableName)
// 更新数据
//updateData(spark, kuduContext, tableName)
// 插入更新数据
//upsertData(spark, kuduContext, tableName)
// 删除数据
//deleteData(spark, kuduContext, tableName)
//应用结束,关闭资源
spark.stop()
}
}
用SparkSQL操作Kudu
package com.aa.kudu.sql
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
object KuduSparkSQLDemo {
def main(args: Array[String]): Unit = {
//构建SparkSession实例对象
val spark:SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions",2)
.getOrCreate()
import spark.implicits._
// TODO: 从Kudu表加载数据
val kuduDF:DataFrame = spark.read
.format("kudu")
.option("kudu.table","kudu_aa_users")
.option("kudu.master","192.168.88.20:7051")
.load()
kuduDF.printSchema()
kuduDF.show(1,truncate = false)
// TODO: 自定义UDF函数
val gender_to_udf:UserDefinedFunction = udf(
(gender:String) =>{
gender match{
case "男" => "M"
case "女" => "F"
case _ => "?"
}
}
)
// TODO: 调用UDF函数
val etlDF:DataFrame = kuduDF.select(
$"id",$"name",
$"age".plus(1).as("age"),
gender_to_udf($"gender").as("gender")
)
etlDF.printSchema()
etlDF.show(10,truncate = false)
// TODO: 保存到Kudu表
etlDF.write
.mode(SaveMode.Append)
.format("kudu")
.option("kudu.table"<以上是关于Spark操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章
spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource
spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource