使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败

Posted

技术标签:

【中文标题】使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败【英文标题】:Reading avro data with Databricks from Azure Data Lake Gen1 generated by Azure EventHubs Capture fails 【发布时间】:2019-12-01 15:39:14 【问题描述】:

我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)

以下语句失败

rawData.count()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file

EventHub-Capture 是否写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?

【问题讨论】:

【参考方案1】:

实现冷摄取路径的一种模式是使用Event Hubs Capture。 EventHubs 捕获每个分区写入一个文件,如windowing parameters 所定义。数据以 avro 格式编写,可以使用 Apache Spark 进行分析。

那么使用此功能的最佳做法是什么?

1.不要过度分区

我经常看到人们使用默认配置,这最终导致很多小文件。如果您想使用 Spark 使用通过 EventHubs Capture 摄取的数据,请记住使用 Spark 的 file sizes in Azure Data Lake Store 和 partitions 的最佳实践。文件大小应为 ~256 MB,分区大小应在 10 到 50 GB 之间。因此,最终配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。

2。勾选“不发出空文件选项”

您应该选中“不发出空文件选项”。如果你想用 Spark 消费数据,节省不必要的文件操作。

3.在文件路径中使用数据源

借助流式架构,您的 EventHub 就像在面向批处理的架构方法中的着陆区。因此,您将在原始数据层中摄取数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您从工厂中的机器人获取遥测数据,这可能是目录路径 /raw/robots/

存储命名需要使用 Namesapce、PartitionId 等所有属性。因此,最后一个良好的捕获文件格式定义具有明确定义的路径、每日分区和使用 Azure Data Lake Gen 2 中文件名的剩余属性可能如下所示:

 /raw/robots/ingest_date=Year-Month-Day/HourMinuteSecond-Namespace-EventHub-PartitionId

4.想想压实工作

捕获的数据未压缩,并且在您的用例中也可能最终变成小文件(因为最低写入频率为 15 分钟)。因此,如有必要,请编写一个每天运行一次的压缩作业。类似的东西

df.repartition(5).write.format("avro").save(targetpath)

会做这项工作。

那么现在读取捕获数据的最佳做法是什么?

5.忽略读取数据的非 avro 文件

Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳做法是仅使用 avro-extension 读取数据。您可以通过 spark 配置轻松实现此目的:

spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")

6.只读相关分区

考虑只读取相关的分区,例如。 G。过滤当前摄取日期。

7.使用共享元数据

读取捕获的数据与直接从 Azure EventHubs 读取数据类似。 所以你必须有一个模式。假设您还有使用 Spark 结构化流直接读取数据的作业,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:

["MeasurementTS":"timestamp","Location":"string", "Temperature":"double"]

并用这个simple parsing function阅读它:

# parse the metadata to get the schema
from collections import OrderedDict 
from pyspark.sql.types import *
import json

ds = dbutils.fs.head (metadata)                                                 # read metadata file

items = (json
  .JSONDecoder(object_pairs_hook=OrderedDict)
  .decode(ds)[0].items())

#Schema mapping 
mapping = "string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType

schema = StructType([
    StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])

所以你可以重用你的架构:

from pyspark.sql.functions import *

parsedData = spark.read.format("avro").load(rawpath). \
  selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
 .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
 .select("EnqueuedTimeUtc", "data.*")

【讨论】:

【参考方案2】:

确保输入数据是“.avro”文件。

由于 spark-avro 模块是外部的,因此 DataFrameReader 或 DataFrameWriter 中没有 .avro API。

要以 Avro 格式加载/保存数据,您需要将数据源选项格式指定为 avro(或 org.apache.spark.sql.avro)。

例子:

Python
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")

#storage->avro
avroDf = spark.read.format("com.databricks.spark.avro").load(in_path)

更多详情,请参考以下链接:

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

http://blog.itaysk.com/2017/01/14/processing-event-hub-capture-files-using-spark

https://medium.com/@caiomsouza/processing-event-hubs-capture-files-avro-format-using-spark-azure-databricks-save-to-parquet-95259001d85f

希望这会有所帮助。

【讨论】:

以上是关于使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败的主要内容,如果未能解决你的问题,请参考以下文章

第一次将内容添加到azure event hubs

使用 .Net 核心应用程序发送到 EventHubs 时如何优化吞吐量

带有 IoT 中心的 Azure Functions 无法检索分区

无法从 spark 读取 Azure Eventhub 主题

Azure 事件中心吞吐量

Android Samsung ACTION_IMAGE_CAPTURE 未保存为正确的文件名