Spark---Dataset

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---Dataset相关的知识,希望对你有一定的参考价值。

文章目录

Dataset

公共类Dataset
扩展对象
实现scala.Serializable
数据集是特定于域的对象的强类型集合,可以使用功能或关系操作并行转换它们。每个数据集还具有一个称为a的无类型视图DataFrame,该视图是的数据集Row。

官方参考文档:点这里

package testrdd

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object TestDataSet {

  case class Point(label:String,x:Double,y:Double)
  case class Category(id:Int,name:String)
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("dateSet")
    val sc = SparkContext.getOrCreate(conf)
    val spark = SparkSession.builder().master("local[*]").appName("dataset")
      .config("spark.sql.crossJoin.enabled",true).getOrCreate()
    import spark.implicits._
    //第一种参数Seq
//    val ds = spark.createDataset(1 to 6)
//    ds.show()

    //第二种参数Array
//    val ds2 = spark.createDataset(List(1,2,3,4))
//    ds2.show()

    //第三种参数 RDD
//    val ds3 = spark.createDataset(sc.parallelize(List((1,"Tom",23),(2,"jack",33))))
//    ds3.show()

    //--------------------------------------------------------------------
    val pointsRDD = sc.parallelize(List(("Tom",3.4,4.0),("Jack",5.8,12.0)))
    val catesRDD = sc.parallelize(List((1,"Jack"),(2,"Tom")))

    val points = pointsRDD.map(lines=>Point(lines._1,lines._2,lines._3)).toDS()
    val categories = catesRDD.map(line=>Category(line._1,line._2)).toDS()

    points.join(categories).show()  //全连接---笛卡尔,不推荐
    points.join(categories,points("label")===categories("name") ).show //内连接
  }
}

Catalyst

RDD是Spark的核心,Catalyst优化器是Spark SQL的核心
将我们编写的SQL,转化成RDD执行,类似于Hive将HiveSQL转化成mapreduce过程
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

以上是关于Spark---Dataset的主要内容,如果未能解决你的问题,请参考以下文章

Spark2 DataSet 创建新行之flatMap

`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`

Spark DataSet 日期时间解析

Spark---Dataset

Spark DataSet 过滤器性能

Spark Dataset - 如何通过修改现有列值来创建新列