如何将 RDD [GenericRecord] 转换为 scala 中的数据框?
Posted
技术标签:
【中文标题】如何将 RDD [GenericRecord] 转换为 scala 中的数据框?【英文标题】:How to convert RDD[GenericRecord] to dataframe in scala? 【发布时间】:2017-11-13 12:46:34 【问题描述】:我使用 Avro(序列化器和反序列化器)从 kafka 主题中获取推文。 然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。 现在我想将每个 rdd 转换为一个数据框,以通过 SQL 分析这些推文。 请有任何将 RDD[GenericRecord] 转换为数据帧的解决方案?
【问题讨论】:
你能通过 foreach(println) 更新一些 RDD[GenericRecord] 的样本数据吗? 【参考方案1】:我花了一些时间尝试完成这项工作(特别是如何正确反序列化数据,但看起来你已经涵盖了这个)...更新
//Define function to convert from GenericRecord to Row
def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row =
val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
import scala.collection.JavaConversions._
for (field <- record.getSchema.getFields)
objectArray(field.pos) = record.get(field.pos)
new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
//Inside your stream foreachRDD
val yourGenericRecordRDD = ...
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))
var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])
如您所见,我正在使用 SchemaConverter 从您用于反序列化的模式中获取数据帧结构(这对于模式注册表可能会更痛苦)。为此,您需要以下依赖项
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>
您需要根据自己的情况更改您的 spark 版本。
更新:上面的代码仅适用于 flat avro 模式。
对于嵌套结构,我使用了不同的东西。您可以复制SchemaConverters 类,它必须在com.databricks.spark.avro
内部(它使用databricks 包中的一些受保护类),或者您可以尝试使用spark-bigquery 依赖项。默认情况下无法访问该类,因此您需要在包com.databricks.spark.avro
中创建一个类来访问工厂方法。
package com.databricks.spark.avro
import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
class SchemaConverterUtils
def converterSql(schema : Schema, sqlType : StructType) =
createConverterToSQL(schema, sqlType)
之后你应该能够像
那样转换数据val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
///
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
...
val rowRdd = genericRecordRDD.flatMap(record =>
Try(converter(record).asInstanceOf[Row]).toOption
)
//To DataFrame
val df = sqlContext.createDataFrame(rowRdd, sqlType)
【讨论】:
方法 createDataFrame 需要作为参数 RDD[ROW] 和 structType 但在我的情况下我有 createDataFramei 有 RDD[GenericRecord] 耶!只需确保您查看我的最后一次更改。在创建原始数据之前,我错过了将值传递给 objectArray 对象【参考方案2】:https://***.com/a/48828303/5957143 和 https://***.com/a/47267060/5957143 的组合适合我。
我使用以下创建 MySchemaConversions
package com.databricks.spark.avro
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType
object MySchemaConversions
def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
然后我用了
val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)
// unionedResultRdd 是 unionRDD[GenericRecord]
var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])
在对象 MyObject 中包含 myConverter 的好处是您不会遇到序列化问题 (java.io.NotSerializableException)。
object MyObject
def myConverter(record: GenericRecord,
myAvroRecordConverter: (GenericRecord) => Row): Row =
myAvroRecordConverter.apply(record)
【讨论】:
我们将如何做 spark 3.0.1 版本【参考方案3】:即使这样的事情可能对你有所帮助,
val stream = ...
val dfStream = stream.transform(rdd:RDD[GenericRecord]=>
val df = rdd.map(_.toSeq)
.map(seq=> Row.fromSeq(seq))
.toDF(col1,col2, ....)
df
)
我想建议您另一种方法。使用 Spark 2.x,您可以跳过创建 DStreams
的整个过程。相反,您可以使用结构化流来执行类似的操作,
val df = ss.readStream
.format("com.databricks.spark.avro")
.load("/path/to/files")
这将为您提供一个可以直接查询的数据框。这里,ss
是 spark session 的实例。 /path/to/files
是从 kafka 转储所有 avro 文件的地方。
PS:您可能需要导入spark-avro
libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"
希望这会有所帮助。干杯
【讨论】:
我没有文件,我想将 RDD[GenericRecord] 转换为数据帧【参考方案4】:您可以使用 SQLContext 对象中可用的 createDataFrame(rowRDD: RDD[Row], schema: StructType)。旧 DataFrame 的 RDD 转换示例:
import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
请注意,无需显式设置任何架构列。我们重用了旧的 DF 的模式,它是 StructType 类的,可以很容易地扩展。但是,这种方法有时是行不通的,在某些情况下可能不如第一种方法有效。
【讨论】:
我没有旧数据框。我只有 RDD[GenericRecord] 您创建您的架构。// 示例:val innerSchema = StructType( Array( StructField("value", StringType), StructField("count", LongType) ) )以上是关于如何将 RDD [GenericRecord] 转换为 scala 中的数据框?的主要内容,如果未能解决你的问题,请参考以下文章
使用可序列化函数读取大查询 - 如何从 GenericRecord 获取 NUMERIC 类型
GenericRecord 的 Avro 架构:能够保留空白字段