SparkSQL 和 UDT

Posted

技术标签:

【中文标题】SparkSQL 和 UDT【英文标题】:SparkSQL and UDT 【发布时间】:2015-03-16 16:56:50 【问题描述】:

我尝试使用 SparkSQL (v.1.3.0) 访问 PostgreSQL 数据库。在这个数据库中我有一个表

CREATE TABLE test (
 id bigint,
 values double precision[]
);

为了访问表格,我使用

val sparkConf = new SparkConf().setAppName("TestRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:postgresql://...",
  "dbtable" -> "schema.test",
  "user" -> "...",
  "password" -> "..."))

sqlContext.sql("SELECT * FROM schema.test")

但是,每次我尝试访问包含此数组的表时,我都会得到一个 java.sql.SQLException: Unsupported type 2003

我在 Spark 测试代码中找到了一个示例,该示例在 Spark 中为二维点创建 UDT(请参阅 ExamplePointUDT.scala)。但是,我不明白我怎么可能使用这个代码。

【问题讨论】:

今天在研究 SparkSQL UDT 时,我发现它还不是一个稳定的公共 API,每个 mailing list 和 source annotation/comments。 即使在使用 spark 通过 jbdc 访问 hive2 服务器时,我也会收到此错误,例如 hive.load("jdbc", Map("url" -> "jdbc:hive2://ip:port /;auth=noSasl", "driver" -> "org.apache.hive.jdbc.HiveDriver", "dbtable" -> "default.weeks", "user" -> "user", "password" -> " " )) 【参考方案1】:

至少在 pyspark 中可以通过在查询中进行强制转换来实现这一点。 不要让不受支持的类型达到火花,将它们投射到您的数据库,然后在获得表格后将它们投射回来。

我不确定语法是否正确,但它会是这样的:

val query_table = "(SELECT id, CAST(values AS TEXT) FROM schema.test) AS casted_table"

val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:postgresql://...",
  "dbtable" -> query_table,
  "user" -> "...",
  "password" -> "..."))

jdbcDF.map(x => (x.id, x.values.toArray))

我很确定没有.toArray 会将字符串表示形式转换回数组,它只是占位符代码。但现在只是正确解析它的问题。

当然,这只是一个补丁,但它可以工作。

【讨论】:

以上是关于SparkSQL 和 UDT的主要内容,如果未能解决你的问题,请参考以下文章

SparkSQL的入门实践教程

sparksql怎么show所有数据

SparkSQL、Thrift 服务器和 Tableau

RDD和SparkSQL综合应用

SparkSQL 和局部性

求问怎么设置sparksql读取hive的数据库