如何使用 AvroParquetWriter 从 scala 案例类制作镶木地板文件?

Posted

技术标签:

【中文标题】如何使用 AvroParquetWriter 从 scala 案例类制作镶木地板文件?【英文标题】:How to make a parquet file from scala case class using AvroParquetWriter? 【发布时间】:2017-08-23 14:13:45 【问题描述】:

我有一个类似下面的案例类:

case class Person(id:Int,name: String)

现在,我编写了以下方法,使用 AvroParquetWriterSeq[T] 制作镶木地板文件。

  def writeToFile[T](data: Iterable[T], schema: Schema, path: String, accessKey: String, secretKey: String): Unit = 
    val conf = new Configuration

    conf.set("fs.s3.awsAccessKeyId", accessKey)
    conf.set("fs.s3.awsSecretAccessKey", secretKey)

    val s3Path = new Path(path)
    val writer = AvroParquetWriter.builder[T](s3Path)
      .withConf(conf)
      .withSchema(schema)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .build()
      .asInstanceOf[ParquetWriter[T]]

    data.foreach(writer.write)

    writer.close()
  

架构是:

val schema = SchemaBuilder
    .record("Person")
      .fields()
      .requiredInt("id")
      .requiredString("name")
      .endRecord()

现在,当我使用以下代码调用 writeToFile 时,出现异常:

val personData = Seq(Person(1,"A"),Person(2,"B"))

ParquetService.writeToFile[Person](
      data = personData,
      schema = schema,
      path = s3Path,
      accessKey = accessKey,
      secretKey = secretKey

java.lang.ClassCastException:com.entities.Person 无法转换为 org.apache.avro.generic.IndexedRecord

为什么不能将Person 强制转换为IndexedRecord?我需要做些什么来摆脱这个异常吗?

【问题讨论】:

【参考方案1】:

我有一个类似的问题,根据这个例子

https://github.com/apache/parquet-mr/blob/f84938441be49c665595c936ac631c3e5f171bf9/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectReadWrite.java#L141

您缺少对 writer builder 的一个方法调用。

val writer = AvroParquetWriter.builder[T](s3Path)
  .withConf(conf)
  .withSchema(schema)
  .withDataModel(ReflectData.get) //This one
  .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
  .build()

另外,如果您希望在数据中支持空值,您可以使用ReflectData.AllowNull.get()

【讨论】:

以上是关于如何使用 AvroParquetWriter 从 scala 案例类制作镶木地板文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 graphql 从 Firebase 使用 Flutter 从 Cloud Firestore 获取数据?

如何使用颤振提供程序从 Firestore 获取数据?

如何使用 oauth 从用户那里获取详细信息? [关闭]

如何从命令行(使用 kotlinc)使用 kapt?

如何使用 SWIG 从 C 调用 C# 方法?

如何从 python 使用 mongolab 插件到 Heroku?