如何将 RDD [GenericRecord] 转换为 scala 中的数据框?

Posted

技术标签:

【中文标题】如何将 RDD [GenericRecord] 转换为 scala 中的数据框?【英文标题】:How to convert RDD[GenericRecord] to dataframe in scala? 【发布时间】:2017-11-13 12:46:34 【问题描述】:

我使用 Avro(序列化器和反序列化器)从 kafka 主题中获取推文。 然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。 现在我想将每个 rdd 转换为一个数据框,以通过 SQL 分析这些推文。 请有任何将 RDD[GenericRecord] 转换为数据帧的解决方案?

【问题讨论】:

你能通过 foreach(println) 更新一些 RDD[GenericRecord] 的样本数据吗? 【参考方案1】:

我花了一些时间尝试完成这项工作(特别是如何正确反序列化数据,但看起来你已经涵盖了这个)...更新

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = 
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) 
      objectArray(field.pos) = record.get(field.pos)
    

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

如您所见,我正在使用 SchemaConverter 从您用于反序列化的模式中获取数据帧结构(这对于模式注册表可能会更痛苦)。为此,您需要以下依赖项

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

您需要根据自己的情况更改您的 spark 版本。

更新:上面的代码仅适用于 flat avro 模式。

对于嵌套结构,我使用了不同的东西。您可以复制SchemaConverters 类,它必须在com.databricks.spark.avro 内部(它使用databricks 包中的一些受保护类),或者您可以尝试使用spark-bigquery 依赖项。默认情况下无法访问该类,因此您需要在包com.databricks.spark.avro 中创建一个类来访问工厂方法。

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils 

  def converterSql(schema : Schema, sqlType : StructType) = 
    createConverterToSQL(schema, sqlType)
  


之后你应该能够像

那样转换数据
val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => 
        Try(converter(record).asInstanceOf[Row]).toOption
      )
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)

【讨论】:

方法 createDataFrame 需要作为参数 RDD[ROW] 和 structType 但在我的情况下我有 createDataFramei 有 RDD[GenericRecord] 耶!只需确保您查看我的最后一次更改。在创建原始数据之前,我错过了将值传递给 objectArray 对象【参考方案2】:

https://***.com/a/48828303/5957143 和 https://***.com/a/47267060/5957143 的组合适合我。

我使用以下创建 MySchemaConversions

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions 
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]

然后我用了

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

// unionedResultRdd 是 unionRDD[GenericRecord]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

在对象 MyObject 中包含 myConverter 的好处是您不会遇到序列化问题 (java.io.NotSerializableException)。

object MyObject
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)

【讨论】:

我们将如何做 spark 3.0.1 版本【参考方案3】:

即使这样的事情可能对你有所帮助,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
)

我想建议您另一种方法。使用 Spark 2.x,您可以跳过创建 DStreams 的整个过程。相反,您可以使用结构化流来执行类似的操作,

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

这将为您提供一个可以直接查询的数据框。这里,ss 是 spark session 的实例。 /path/to/files 是从 kafka 转储所有 avro 文件的地方。

PS:您可能需要导入spark-avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

希望这会有所帮助。干杯

【讨论】:

我没有文件,我想将 RDD[GenericRecord] 转换为数据帧【参考方案4】:

您可以使用 SQLContext 对象中可用的 createDataFrame(rowRDD: RDD[Row], schema: StructType)。旧 DataFrame 的 RDD 转换示例:

import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

请注意,无需显式设置任何架构列。我们重用了旧的 DF 的模式,它是 StructType 类的,可以很容易地扩展。但是,这种方法有时是行不通的,在某些情况下可能不如第一种方法有效。

【讨论】:

我没有旧数据框。我只有 RDD[GenericRecord] 您创建您的架构。// 示例:val innerSchema = StructType( Array( StructField("value", StringType), StructField("count", LongType) ) )

以上是关于如何将 RDD [GenericRecord] 转换为 scala 中的数据框?的主要内容,如果未能解决你的问题,请参考以下文章

使用可序列化函数读取大查询 - 如何从 GenericRecord 获取 NUMERIC 类型

GenericRecord 的 Avro 架构:能够保留空白字段

KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord

Spark RDD API详解(转)

Spark学习之路 Spark之RDD[转]

Spark RDD转DataFrame