使用 spark 结构化流从 s3 读取 avro 文件

Posted

技术标签:

【中文标题】使用 spark 结构化流从 s3 读取 avro 文件【英文标题】:Reading avro files from s3 with spark structured streaming 【发布时间】:2017-10-20 13:43:41 【问题描述】:

我想使用 Spark 结构化流 API 从 s3 读取 Avro 文件。您可以找到有关使用 Kafka 执行此操作的信息,但我找不到 s3 的任何内容。这里的问题是我不知道要设置什么格式。这是我的简单代码:

 Dataset<Row> baseDataSet = sparkSession            
    .readStream()                              
    .format("?") //What this format should be?                            
    .schema(new StructType()                   
            .add("value", "binary"))           
    .load("s3://path/to/streaming/key")    
    .select(col("value"))
    .map(value -> //do avro deserialization,Encoders.kryo(//deserialization class))                                    
    .writeStream() 
    .trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    .format("console")
    .outputMode("update")
    .start();

我了解结构化流 API 中仍未实现 avro。但是我应该采用什么格式来读取二进制数据,然后以我想要的任何方式(在 map 函数中)反序列化它。

【问题讨论】:

【参考方案1】:

有一个third-party package for avro。您可以通过指定format("com.databricks.spark.avro")直接下载jar并与spark一起使用它来加载avro文件。

目前无法读取结构化流中的整个文件以在以后应用反序列化。

但是,如果您仍然想要自定义反序列化器,您可以通过实现 trait DataSourceRegister 来开发自定义数据源。例如,您可能想检查spark-avro package。

如果您需要将输入数据转换为字节数组,您可以使用以下内容:

session
    .readStream()
    .textFile("path-to-folder")
    .as(Encoders.BINARY())
    .map(bytesToStringMapper, Encoders.STRING())
    .writeStream()
    .outputMode(OutputMode.Append())
    .format("text")
    .option("path", "path-to-folder")
    .option("checkpointLocation", "path-to-folder")
    .queryName("test-query")
    .start();

当前方法将文件作为文本逐行加载。这意味着bytesToStringMapper 接收单行作为字节数组并将其转换为字符串。

【讨论】:

如何将此第三方软件包添加到胶水作业中?? 使用--extra-jars 参数(docs.aws.amazon.com/glue/latest/dg/…)

以上是关于使用 spark 结构化流从 s3 读取 avro 文件的主要内容,如果未能解决你的问题,请参考以下文章

在 emr 中使用 spark 从 S3 读取 avro 失败

使用 pyspark 从 s3 读取/加载 avro 文件

使用 C# 从 AWS 上的 S3 读取 Avro 数据

如何使用 spark-avro 包从 spark-shell 读取 avro 文件?

Spark:使用 Spark Scala 从 Kafka 读取 Avro 消息

0016-Avro序列化&反序列化和Spark读取Avro数据