如何在 PySpark 中读取 Avro 文件

Posted

技术标签:

【中文标题】如何在 PySpark 中读取 Avro 文件【英文标题】:How to read Avro file in PySpark 【发布时间】:2015-06-27 21:17:42 【问题描述】:

我正在使用 python 编写一个 spark 作业。但是,我需要阅读一大堆 avro 文件。

This 是我在 Spark 的示例文件夹中找到的最接近的解决方案。但是,您需要使用 spark-submit 提交此 python 脚本。在 spark-submit 的命令行中,您可以指定驱动程序类,在这种情况下,您所有的 avrokey、avrovalue 类都将被定位。

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)

在我的情况下,我需要在 Python 脚本中运行所有内容,我尝试创建一个环境变量来包含 jar 文件,手指交叉 Python 会将 jar 添加到路径中,但显然不是,它给出我意外的类错误。

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"

谁能帮助我如何在一个 python 脚本中读取 avro 文件?

【问题讨论】:

【参考方案1】:

Spark >= 2.4.0

您可以使用built-in Avro support。该 API 向后兼容 spark-avro 包,并添加了一些内容(最值得注意的是 from_avro / to_avro 函数)。

请注意,该模块未与标准 Spark 二进制文件捆绑在一起,必须使用 spark.jars.packages 或等效机制来包含。

另见Pyspark 2.4.0, read avro from kafka with read stream - Python

火花

您可以使用spark-avro 库。首先让我们创建一个示例数据集:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema_string ='''"namespace": "example.avro",
 "type": "record",
 "name": "KeyValue",
 "fields": [
     "name": "key", "type": "string",
     "name": "value",  "type": ["int", "null"]
 ]
'''

schema = avro.schema.parse(schema_string)

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
    wrt.append("key": "foo", "value": -1)
    wrt.append("key": "bar", "value": 1)

使用spark-csv 阅读它就像这样简单:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()

## +---+-----+
## |key|value|
## +---+-----+
## |foo|   -1|
## |bar|    1|
## +---+-----+ 

【讨论】:

能否提供pysparkfrom_avro 的示例? 如果我错了请纠正我,但看起来内置的 from_avroto_avro 函数在 PySpark 2.4.x 中尚不可用。根据@since 标签here,看起来这些正在PySpark 3.0 中添加。 @mattjw Pyspark 2.4.0, read avro from kafka with read stream - Python【参考方案2】:

前一种解决方案需要安装第三方 Java 依赖项,这不是大多数 Python 开发人员所满意的。但是,如果您只想用给定的模式解析 Avro 文件,那么您实际上并不需要外部库。您可以只读取二进制文件并使用您最喜欢的 python Avro 包解析它们。

例如,这是您可以使用 fastavro 加载 Avro 文件的方式:

from io import BytesIO
import fastavro

schema = 
    ...


rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))

print(rdd.collect())

【讨论】:

【参考方案3】:

对于 Spark

解决方法如下:

avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro

#use subprocess module
import subproccess as SP

load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()

avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)

【讨论】:

以上是关于如何在 PySpark 中读取 Avro 文件的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 从 s3 读取/加载 avro 文件

Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python

在 pyspark 中无法读取 avro 格式问题

如何读取大的avro文件,并将整个文件加载到内存中。

如何使用 spark-avro 包从 spark-shell 读取 avro 文件?

如何在python中提取avro文件的模式