Spark中文手册7:Spark-sql由入门到精通续

Posted wanmeilingdu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中文手册7:Spark-sql由入门到精通续相关的知识,希望对你有一定的参考价值。

问题导读

1.sqlContext.cacheTable("tableName")与sqlContext.uncacheTable("tableName")它们的作用是什么?
2.Spark SQL CLI的作用是什么?
3.Spark SQL数据类型有哪些。如何访问它们?






性能调优


对于某些工作负载,可以在通过在内存中缓存数据或者打开一些实验选项来提高性能。
在内存中缓存数据 Spark SQL可以通过调用sqlContext.cacheTable("tableName")方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的 压力。你可以通过调用sqlContext.uncacheTable("tableName")方法在内存中删除表。
注意,如果你调用schemaRDD.cache()而不是sqlContext.cacheTable(...),表将不会用柱状格式来缓存。在这种情况下,sqlContext.cacheTable(...)是强烈推荐的用法。
可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。

Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险




其它的配置选项
以下的选项也可以用来调整查询执行的性能。有可能这些选项会在以后的版本中弃用,这是因为更多的优化会自动执行。
Property Name Default Meaning
spark.sql.autoBroadcastJoinThreshold 10485760(10m) 配置一个表的最大大小(byte)。当执行join操作时,这个表将会广播到所有的worker节点。可以将值设置为-1来禁用广播。注意,目前的统计数据只支持Hive Metastore表,命令ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan已经在这个表中运行。
spark.sql.codegen false 当为true时,特定查询中的表达式求值的代码将会在运行时动态生成。对于一些拥有复杂表达式的查询,此选项可导致显著速度提升。然而,对于简单的查询,这个选项会减慢查询的执行
spark.sql.shuffle.partitions 200 配置join或者聚合操作shuffle数据时分区的数量



其它SQL接口


Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。
运行Thrift JDBC/ODBC服务器
这里实现的Thrift JDBC/ODBC服务器与Hive 0.12中的 HiveServer2相一致。你可以用在Spark 或者Hive 0.12附带的beeline脚本测试JDBC服务器。 在Spark目录中,运行下面的命令启动JDBC/ODBC服务器。
  1. ./sbin/start-thriftserver.sh
复制代码
这个脚本接受任何的bin/spark-submit命令行参数,加上一个--hiveconf参数用来指明Hive属性。你可以运行./sbin/start-thriftserver.sh --help来获得所有可用选项的完整 列表。默认情况下,服务器监听localhost:10000。你可以用环境变量覆盖这些变量。
  1. export HIVE_SERVER2_THRIFT_PORT=<listening-port>
  2. export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
  3. ./sbin/start-thriftserver.sh \\
  4.   --master <master-uri> \\
  5.   ...
复制代码
或者通过系统变量覆盖。
  1. ./sbin/start-thriftserver.sh \\
  2.   --hiveconf hive.server2.thrift.port=<listening-port> \\
  3.   --hiveconf hive.server2.thrift.bind.host=<listening-host> \\
  4.   --master <master-uri>
  5.   ...
复制代码
现在你可以用beeline测试Thrift JDBC/ODBC服务器。
  1. ./bin/beeline
复制代码
连接到Thrift JDBC/ODBC服务器的方式如下:
  1. beeline> !connect jdbc:hive2://localhost:10000
复制代码
Beeline将会询问你用户名和密码。在非安全的模式,简单地输入你机器的用户名和空密码就行了。对于安全模式,你可以按照 Beeline文档的说明来执行。

运行Spark SQL CLI
Spark SQL CLI是一个便利的工具,它可以在本地运行Hive元存储服务、执行命令行输入的查询。注意,Spark SQL CLI不能与Thrift JDBC服务器通信。 在Spark目录运行下面的命令可以启动Spark SQL CLI。
  1. ./bin/spark-sql
复制代码


编写语言集成(Language-Integrated)的相关查询
语言集成的相关查询是实验性的,现在暂时只支持scala。 Spark SQL也支持用领域特定语言编写查询。

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
  4. import sqlContext._
  5. val people: RDD[Person] = ... // An RDD of case class objects, from the first example.

  6. // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
  7. val teenagers = people.where('age >= 10).where('age <= 19).select('name)
  8. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码 DSL使用Scala的符号来表示在潜在表(underlying table)中的列,这些列以前缀(')标示。将这些符号隐式转换成由SQL执行引擎计算的表达式。你可以在ScalaDoc 中了解详情。


Spark SQL数据类型

  • 数字类型
    • ByteType:代表一个字节的整数。范围是-128到127
    • ShortType:代表两个字节的整数。范围是-32768到32767
    • IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
    • LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
    • FloatType:代表4字节的单精度浮点数
    • DoubleType:代表8字节的双精度浮点数
    • DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
    • StringType:代表一个字符串值
    • BinaryType:代表一个byte序列值
    • BooleanType:代表boolean值
    • Datetime类型
      • TimestampType:代表包含字段年,月,日,时,分,秒的值
      • DateType:代表包含字段年,月,日的值
    • 复杂类型
      • ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值
      • MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值
      • StructType(fields):表示一个拥有StructFields (fields)序列结构的值
        • StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。



Spark的所有数据类型都定义在包org.apache.spark.sql中,你可以通过import org.apache.spark.sql._访问它们。
数据类型 Scala中的值类型 访问或者创建数据类型的API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType scala.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull]) 注意containsNull默认为true
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull]) 注意valueContainsNull默认为true
StructType org.apache.spark.sql.Row StructType(fields) ,注意fields是一个StructField序列,相同名字的两个StructField不被允许
StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, nullable)



以上是关于Spark中文手册7:Spark-sql由入门到精通续的主要内容,如果未能解决你的问题,请参考以下文章

Spark Graphx编程指南

Spark-SQL的具体编程场景

基于spark1.4的Spark-Sql

spark-sql case when 问题

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

在 HIVE 上插入 Spark-SQL 插件