如何将 Spark 数据集与 Thrift 一起使用

Posted

技术标签:

【中文标题】如何将 Spark 数据集与 Thrift 一起使用【英文标题】:How can I use Spark Dataset with Thrift 【发布时间】:2016-02-06 10:37:43 【问题描述】:

我的数据格式是用 apache thrift 定义的,代码由 scrooge 生成。我使用 parquet 将其存储在 spark 中,非常类似于 blog 中的说明。

我可以很容易地将该数据读回 Dataframe,只需执行以下操作:

val df = sqlContext.read.parquet("/path/to/data")

而且我可以在 RDD 中通过更多的体操来阅读它:

def loadRdd[V <: TBase[_, _]](inputDirectory: String, vClass: Class[V]): RDD[V] = 
    implicit val ctagV: ClassTag[V] = ClassTag(vClass)
    ParquetInputFormat.setReadSupportClass(jobConf, classOf[ThriftReadSupport[V]])
    ParquetThriftInputFormat.setThriftClass(jobConf, vClass)
    val rdd = sc.newAPIHadoopFile(
      inputDirectory, classOf[ParquetThriftInputFormat[V]], classOf[Void], vClass, jobConf)
    rdd.asInstanceOf[NewHadoopRDD[Void, V]].values
  
loadRdd("/path/to/data", classOf[MyThriftClass])

我的问题是:如何在 spark 1.6 发布的新 Dataset api 中访问这些数据?我想要这样做的原因是数据集 api 的好处:类型安全性与数据帧的速度相同。

我知道需要某种编码器,并且已经为原始类型和案例类提供了这些编码器,但我所拥有的是节俭生成的代码(java 或 scala 之一,任何一个都符合要求),看起来确实很像一个案例类,但实际上并不是一个。

我尝试了明显的选项,但没有奏效:

val df = sqlContext.read.parquet("/path/to/data")

df.as[MyJavaThriftClass]

<console>:25: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._  Support for serializing other types will be added in future releases.

df.as[MyScalaThriftClass]

scala.ScalaReflectionException: <none> is not a term
  at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
  at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:492)
  at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
  ... 48 elided


df.as[MyScalaThriftClass.Immutable]

java.lang.UnsupportedOperationException: No Encoder found for org.apache.thrift.protocol.TField
- field (class: "org.apache.thrift.protocol.TField", name: "field")
- array element class: "com.twitter.scrooge.TFieldBlob"
- field (class: "scala.collection.immutable.Map", name: "_passthroughFields")
- root class: "com.worldsense.scalathrift.ThriftRange.Immutable"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
  at scala.collection.immutable.List.flatMap(List.scala:327)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
  at org.apache.spark.sql.catalyst.ScalaReflection$.toCatalystArray$1(ScalaReflection.scala:419)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:537)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
  at scala.collection.immutable.List.flatMap(List.scala:327)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
  at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
  ... 48 elided

似乎无形的works fine 带有 Thrift 生成的代码,我想知道是否可以使用它来生成当前编码器 api 可以接受的东西。

有什么提示吗?

【问题讨论】:

你看过frameless吗? 嗨@MilesSabin,这看起来很有希望,但是通过查看代码,我无法弄清楚如果没有案例类它是否可以工作。事实上,似乎唯一的公共 API RichDataSet 已经以 Dataset 开头。我会ping gitter 频道,看看作者有没有好的建议。 你发现了吗? 【参考方案1】:

应该可以通过将Encoders.bean(My.getClass) 作为显式隐式传递来解决此问题。

示例:df.as[MyJavaThriftClass](Encoders.bean(MyJavaThriftClass.getClass))

【讨论】:

以上是关于如何将 Spark 数据集与 Thrift 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

将数据集与 xarray 合并使变量为 nan

Spark 2.0 数据集与数据帧

如何在CDH中启用Spark Thrift

Spark的thrift端口

Spark Thrift JDBCServer应用场景解析与实战案例

spark thrift server 与 网易 kyuubi thrift server