Spark:写入 Avro 文件

Posted

技术标签:

【中文标题】Spark:写入 Avro 文件【英文标题】:Spark: Writing to Avro file 【发布时间】:2014-01-03 22:35:20 【问题描述】:

我在 Spark 中,我有一个来自 Avro 文件的 RDD。我现在想对该 RDD 进行一些转换并将其保存为 Avro 文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

运行此 Spark 时抱怨 Schema$recordSchema 不可序列化。

如果我取消注释 .map 调用(并且只有 rdd.saveAsNewAPIHadoopFile),则调用成功。

我在这里做错了什么?

有什么想法吗?

【问题讨论】:

能否提供异常堆栈跟踪? Spark、Hadoop 和 Avro 版本号也可能有用。 请原谅我的幼稚。请问这里的工作是做什么的?看起来这是一个地图减少工作?如果我们使用 spark 写出,为什么我们需要 map reduce 作业? 【参考方案1】:

这里的问题与 Job 中使用的 avro.Schema 类的不可序列化有关。当您尝试从 map 函数内部的代码中引用架构对象时,将引发异常。

例如,如果您尝试执行以下操作,您将得到 “Task not serializable” 异常:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => 
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
)

您可以通过在功能块内创建模式的新实例来使一切正常工作:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => 
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
)

由于您不希望为您处理的每条记录解析 avro 架构,因此更好的解决方案是在分区级别解析架构。以下也有效:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => 
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => 
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  )
)

只要您提供对 jsonSchema 文件的可移植引用,上面的代码就可以工作,因为 map 函数将由多个远程执行程序执行。它可以是对 HDFS 中文件的引用,也可以与 JAR 中的应用程序一起打包(在后一种情况下,您将使用类加载器函数来获取其内容)。

对于那些尝试将 Avro 与 Spark 一起使用的人,请注意仍然存在一些未解决的编译问题,您必须在 Maven POM 上使用以下导入:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

注意"hadoop2" 分类器。您可以通过https://issues.apache.org/jira/browse/SPARK-3039 跟踪问题。

【讨论】:

当我们的 map 函数内部没有外部依赖时,这个方法可以正常工作。有什么方法可以使架构可序列化?【参考方案2】:

Spark 使用的默认序列化器是 Java 序列化。因此,对于所有 java 类型,它将尝试使用 Java 序列化进行序列化。 AvroKey 不可序列化,因此您会遇到错误。

您可以在自定义序列化中使用 KryoSerializer 或插件(如 Avro)。您可以在此处阅读有关序列化的更多信息。 http://spark-project.org/docs/latest/tuning.html

您还可以用可外部化的东西来包装您的对象。例如,在此处查看包装 AvroFlumeEvent 的 SparkFlumeEvent:https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

【讨论】:

【参考方案3】:

使用dataframe,使用databrics库创建avro非常简单。

dataframe.write.format("com.databricks.spark.avro").avro($hdfs_path)

在您的情况下,输入是 avro,因此它将具有与之关联的架构,因此您可以直接将 avro 读入数据帧,并且在转换后您可以使用上述代码写入 avro。

将 avro 读入数据帧:

火花 1.6

val 数据框 =sqlContext.read.avro($hdfs_path) 或 val 数据框 = sqlContext.read.format("com.databricks.spark.avro").load($hdfs_path)

火花 2.1

val 数据框 =sparkSession.read.avro($hdfs_path) 或 val 数据框 = sparkSession.read.format("com.databricks.spark.avro").load($hdfs_path)

【讨论】:

以上是关于Spark:写入 Avro 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?

在 Spark 2.0 中从 AVRO 写入镶木地板时出现 NullPointerException

无法使用 Spark 2.4.3 写入 Redshift

性能:Google Dataflow 将 avro 文件写入 GCS

从 BigQuery 读取数据并将其写入云存储上的 avro 文件格式

databricks avro 架构无法转换为 Spark SQL 结构类型