客快物流大数据项目(四十五):Spark操作Kudu DML操作
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(四十五):Spark操作Kudu DML操作相关的知识,希望对你有一定的参考价值。
Spark操作Kudu DML操作
Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:
- INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。 使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述 的INSERT_IGNORE。
- INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。
- DELETE - 从Kudu表中删除DataFrame中的行
- UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
- UPDATE - 更新dataframe中的行
一、插入数据insert操作
先创建一张表,然后把数据插入到表中
package cn.it
import java.util
import cn.it.SparkKuduDemo.TABLE_NAME, it
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
object SparkKuduTest
//定义样例类
case class person(id:Int, name:String, age:Int, sex:Int)
def main(args: Array[String]): Unit =
//构建sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
//构建SparkSession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//获取sparkContext对象
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//构建KuduContext对象
val kuduContext = new KuduContext("node2:7051", sc)
//1.创建表操作
createTable(kuduContext)
/**
* 创建表
*
* @param kuduContext
* @return
*/
def createTable(kuduContext: KuduContext) =
//如果表不存在就去创建
if (!kuduContext.tableExists(TABLE_NAME))
//构建创建表的表结构信息,就是定义表的字段和类型
val schema: StructType = StructType(
StructField("userId", StringType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("sex", StringType, false) :: Nil)
//指定表的主键字段
val keys = List("userId")
//指定创建表所需要的相关属性
val options: CreateTableOptions = new CreateTableOptions
//定义分区的字段
val partitionList = new util.ArrayList[String]
partitionList.add("userId")
//添加分区方式为hash分区
options.addHashPartitions(partitionList, 6)
//创建表
kuduContext.createTable(TABLE_NAME, schema, keys, options)
/**
* 2)加载数据
* @param session
* @param sc
* @param kuduContext
*/
def inserData(session: SparkSession, sc: SparkContext, kuduContext: KuduContext): Unit =
//定义数据
val data = List(person(1, "tom", 30, 1), person(2, "mark", 26, 0))
val personRDD = sc.makeRDD(data)
import session.implicits._
val dataFrame: DataFrame = personRDD.toDF
kuduContext.insertRows(dataFrame, TABLE_NAME)
二、删除数据delete操作
/**
* 4)删除数据
* @param session
* @param kuduContext
*/
def deleteData(session: SparkSession, kuduContext: KuduContext): Unit =
//定义数据
val data = List(person(1, "tom", 50, 1), person(2, "mark", 30, 0))
import session.implicits._
val dataFrame: DataFrame = data.toDF().select("id")
kuduContext.deleteRows(dataFrame, TABLE_NAME)
三、更新数据upsert操作
/**
* 3)修改数据
* @param session
* @param kuduContext
*/
def upDATEData(session: SparkSession, kuduContext: KuduContext): Unit =
//定义数据
val data = List(person(1, "tom", 50, 1), person(2, "mark", 30, 0))
import session.implicits._
val dataFrame: DataFrame = data.toDF()
kuduContext.upDATERows(dataFrame, TABLE_NAME)
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于客快物流大数据项目(四十五):Spark操作Kudu DML操作的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu