10.spark sql之快速入门

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10.spark sql之快速入门相关的知识,希望对你有一定的参考价值。

前世今生

Hive&Shark

??随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。

??但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:

  • MapR的Drill
  • Cloudera的Impala
  • Shark

??Shark是伯克利实验室Spark生态的组件之一,它修改了Hive Driver的内存管理、物理计划、执行三个模块,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。

技术分享图片

Shark&Spark SQL

??Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。

??SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大地提升。

  • 数据兼容方面

??不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,也支持获取RDBMS数据以及cassandra等NOSQL数据。

  • 性能优化方面

??除了采取In-Memory Columnar Storage、byte-code generation等优化技术外,引进Cost Model对查询进行动态评估、获取最佳物理计划等。

  • 组件扩展方面

??无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

??2014年Shark停止开发,团队将所有资源放SparkSQL项目上,至此,Shark的发展画上了句号,但也因此发展出两条线:SparkSQL和Hive on Spark。

技术分享图片

??其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

简介

??Spark SQL是一个用于结构化数据处理的模块。Spark SQL赋予待处理数据一些结构化信息,可以使用SQL语句或DataSet API接口与Spark SQL进行交互。

  • SQL

??Spark SQL可以使用sql读写Hive中的数据;也可以在编程语言中使用sql,返回Dataset/DataFrame结果集。

  • DataSets&DataFrames

??Dataset是一个分布式数据集,它结合了RDD与SparkSQL执行引擎的优点。Dataset可以通过JVM对象构造,然后使用算子操作进行处理。Java和Scala都有Dataset API;Python和R本身支持Dataset特性。

??DataFrame是一个二维结构的DataSet,相当于RDBMS中的表。DataFrame可以有多种方式构造,比如结构化数据文件、hive表、外部数据库、RDD等。在Scala、Java、Python及R中都有DataFrame API。

DataFrame与DataSet

DataFrame创建及操作

  • scala
import org.apache.spark.sql.SparkSession

// 构造SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// 创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// DataFrame操作
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • java
import org.apache.spark.sql.SparkSession;

//构造SparkSession
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

//创建DataFrame
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

//DataFrame操作
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • python
from pyspark.sql import SparkSession

# 构造SparkSession
spark = SparkSession     .builder     .appName("Python Spark SQL basic example")     .config("spark.some.config.option", "some-value")     .getOrCreate()

# 创建DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# DataFrame操作
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df[‘name‘], df[‘age‘] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df[‘age‘] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

DataSet创建及操作

??Datasets和RDD类似,但使用专门的Encoder编码器来序列化需要经过网络传输的数据对象,而不用RDD使用的Java序列化或Kryo库。Encoder编码器是动态生成的代码,允许直接执行各种算子操作,而不用反序列化。

  • scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

SQL操作

  • scala
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
//df.createGlobalTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
//df.createGlobalTempView("people")

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • python
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
# df.createGlobalTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

以上是关于10.spark sql之快速入门的主要内容,如果未能解决你的问题,请参考以下文章

my2sql工具之快速入门

大数据-10-Spark入门之支持向量机SVM分类器

深入浅出Mybatis之快速入门!

深入浅出Mybatis之快速入门!

mybatis快速入门

sql [SQL查询片段]用于在命令行或通过R和其他工具使用SQL的快速代码段#tags:sql,R,text processing,命令li