Introducing Apache Spark Datasets

Posted 小帆的帆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Introducing Apache Spark Datasets相关的知识,希望对你有一定的参考价值。

原文连接:https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

开发者已经喜欢上了Apache Spark提供的简洁而强大的API, 使用它们可以很容易的分析复杂问题。在Databricks,我们不断的加强Spark的可用性和性能,从而引入了DataFrameSpark SQL。这些高级的API, 用于处理结构化数据(例如数据库表,JSON文件),能够使Spark自动完成存储和计算的优化,而这是Catalyst optimizer和Tungsten的功劳。但这种优化在RDD中是不可能的,比如通过原始二进制形式操作数据。

今天我们推出了Dataset,它是DataFrame的扩展,提供类型安全和面向对象的编程接口。Spark1.6包括Dataset的预览版API,它会是原来几个版本开发的重点。Like DataFrames, Datasets take advantage of Spark’s Catalyst optimizer by exposing expressions and data fields to a query planner。Dataset还支持Tungsten的快速内存编码(Encoder)。Dataset继承了这些优点并且是类型安全的,这意味着在编译时能够检查错误。同时还拥有面向对象风格的接口。

从长远来看,我们期望能够利用Dataset写出更高效的Spark应用。We have designed them to work alongside the existing RDD API,但处理效率会更高。Spark 1.6 初步的展示了DataSet,我们将会在未来的版本中改进它们。

Working with Datasets

A Dataset is a strongly-typed, immutable collection of objects that are mapped to a relational schema。在Dataset API的核心是一个新的概念叫Encoder,它负责JVM对象和tabular representation之间的转换。The tabular representation is stored using Spark’s internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 支持多种类型的Encoder,包括原始数据类型(如String, Integer, Long),Scala case class和 Java Bean。

RDD用户会发现Dataset API相当熟悉,因为它提供了许多相同的transformation(例如,map, flatMap, filter)。下面是一个WordCount的例子:

RDD

val lines = sc.textFile("/wikipedia")
val words = lines
  .flatMap(_.split(" "))
  .filter(_ != "")

Dataset

val lines = sqlContext.read.text("/wikipedia").as[String]
val words = lines
  .flatMap(_.split(" "))
  .filter(_ != "")

两个API都很容易使用lambda表达式来表达transformation。编译器和IDE了解你使用了什么类型,所以能提供有用的提示和错误信息。

While this high-level code may look similar syntactically, with Datasets you also have access to all the power of a full relational execution engine. 例如,一个聚合操作,如计算每个单词的出现次数:

RDD

val counts = words
    .groupBy(_.toLowerCase)
    .map(w => (w._1, w._2.size))

Dataset

val counts = words 
    .groupBy(_.toLowerCase)
    .count()

Dataset版本的WordCount使用了内置的aggregate count,这种方式不仅代码量更少,而且执行的更快。如下图所示,Dataset运行速度要比RDD快得多。相比之下,使用RDD则需要考虑如果表达效率会更高。

Dataset的另一个好处是减少内存使用量。由于Spark理解Dataset的数据结构,所以当缓存Dataset时能够在内存中创建更优化的结构,占用更小的内存。下面比较Dataset和DataFrame在缓存白万级String时的表现。缓存,对于双方来说都能够导致性能的提升。但是,由于Dataset Encoder给Spark提供更多关于数据存储的信息,所以缓存占用空间更小,大概比RDD小4.5倍。

Lightning-fast Serialization with Encoders

Encoders are highly optimized and use runtime code generation to build custom bytecode for serialization and deserialization. 因此,速度明显快于java或kryo序列化器。

除了速度,Encoder序列化占用的空间更小(2倍),降低了网络传输成本。此外,序列化的数据是Tungsten二进制格式,这意味着许多时候都可以直接操作数据,而不需要实例化整个对象。Spark支持的Encoder有原始类型(如String, Integer, Long),Scala case class和Java bean。未来会加入可自定义类型的Encoder。

Seamless Support for Semi-Structured Data

Encoder还可以作为一个强大的桥梁,连接半结构化的格式(例如JSON)和类型安全的语言如Java和Scala。

例如,以下关于大学的数据集

"name": "UC Berkeley", "yearFounded": 1868, numStudents: 37581
"name": "MIT", "yearFounded": 1860, numStudents: 11318

你可以简单地定义一个类,并将输入数据映射到类上,而不是人工提取字段,再把它们转换成期望的类型。Spark会自动识别名称和类型。

case class University(name: String, numStudents: Long, yearFounded: Long)
val schools = sqlContext.read.json("/schools.json").as[University]
schools.map(s => s"$s.name is $2015 - s.yearFounded years old")

Encoder在映射的过程中,会先检查定义的类的类型是否与数据相符,如果不相符,则能够提供有用的错误信息,防止以不正确的方式处理TB级的数据。例如,如果我们使用的数据类型太小,转换到一个对象时会导致截断,这时Analyzer会抛出AnalysisException,如:如果numStudents是byte类型的,当有数据超过255时就会报错。

case class University(numStudents: Byte)
val schools = sqlContext.read.json("/schools.json").as[University]

org.apache.spark.sql.AnalysisException: Cannot upcast `yearFounded` from bigint to smallint as it may truncate

执行映射时,Encoder将自动处理复杂类型,包括嵌套类、array和map。

A Single API for Java and Scala

Dataset的另一个目标是为Scala和java提供统一的接口。这种统一对应Java用户来说是个好消息,因为它确保了Java的接口不会落后于Scala。代码实例也会变得更通用,and libraries no longer have to deal with two slightly different types of input。对于Java用户唯一不同的是需要指定Encoder,因为编译器不提供类型信息。例如,如果想要处理JSON数据使用Java可以这样做:

public class University implements Serializable 
    private String name;
    private long numStudents;
    private long yearFounded;

    public void setName(String name) ...
    public String getName() ...
    public void setNumStudents(long numStudents) ...
    public long getNumStudents() ...
    public void setYearFounded(long yearFounded) ...
    public long getYearFounded() ...


class BuildString implements MapFunction<University, String> 
    public String call(University u) throws Exception 
        return u.getName() + " is " + (2015 - u.getYearFounded()) + " years old.";
    


Dataset<University> schools = context.read().json("/schools.json").as(Encoders.bean(University.class));
Dataset<String> strings = schools.map(new BuildString(), Encoders.STRING());

Looking Forward

Dataset是一个新的API,它很容易与RDD和现有Spark项目相融合。只需要通过Dataset的rdd方法,就能将Dataset转换成RDD。从长远来看,我们希望,Dataset可以成为编程时的首选。

Spark 2.0 版本的Dataset,我们计划做以下改进:

  • 性能优化:In many cases, the current implementation of the Dataset API does not yet leverage the additional information it has and can be slower than RDDs. 。在接下来的几个版本中,我们将致力于改善这些新API的性能。
  • 自定义Encoder:开放自定义Encoder的API。
  • Python支持。
  • 统一DataFrame和Dataset:为了保证兼容性,DataFrame和Dataset目前不是继承自共同的父类。Spark 2.0 版本,我们将会统一这些抽象,并尽可能不改变现有的
    API,使得开发库时能够更容易的兼容DataFrame和Dataset。

如果你想尝试一下Dataset。我们提供的以下例子:Working with Classes, Word Count

以上是关于Introducing Apache Spark Datasets的主要内容,如果未能解决你的问题,请参考以下文章

.NET for Apache® Spark? 开源大数据分析工具

Spark Kotlin - 创建空数据集

Introducing Deep Reinforcement

Introducing the Microservices Reference Architecture from NGINX

Introducing RecyclerView

iOS Framework: Introducing MKNetworkKit