使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败
Posted
技术标签:
【中文标题】使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败【英文标题】:Reading avro data with Databricks from Azure Data Lake Gen1 generated by Azure EventHubs Capture fails 【发布时间】:2019-12-01 15:39:14 【问题描述】:我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:
inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)
以下语句失败
rawData.count()
与
org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file
EventHub-Capture 是否写入非 Avro 数据?是否有使用 Spark 读取 EventHub 捕获数据的最佳实践?
【问题讨论】:
【参考方案1】:实现冷摄取路径的一种模式是使用Event Hubs Capture。 EventHubs 捕获每个分区写入一个文件,如windowing parameters 所定义。数据以 avro 格式编写,可以使用 Apache Spark 进行分析。
那么使用此功能的最佳做法是什么?
1.不要过度分区
我经常看到人们使用默认配置,这最终导致很多小文件。如果您想使用 Spark 使用通过 EventHubs Capture 摄取的数据,请记住使用 Spark 的 file sizes in Azure Data Lake Store 和 partitions 的最佳实践。文件大小应为 ~256 MB,分区大小应在 10 到 50 GB 之间。因此,最终配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。
2。勾选“不发出空文件选项”
您应该选中“不发出空文件选项”。如果你想用 Spark 消费数据,节省不必要的文件操作。
3.在文件路径中使用数据源
借助流式架构,您的 EventHub 就像在面向批处理的架构方法中的着陆区。因此,您将在原始数据层中摄取数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您从工厂中的机器人获取遥测数据,这可能是目录路径 /raw/robots/
存储命名需要使用 Namesapce、PartitionId 等所有属性。因此,最后一个良好的捕获文件格式定义具有明确定义的路径、每日分区和使用 Azure Data Lake Gen 2 中文件名的剩余属性可能如下所示:
/raw/robots/ingest_date=Year-Month-Day/HourMinuteSecond-Namespace-EventHub-PartitionId
4.想想压实工作
捕获的数据未压缩,并且在您的用例中也可能最终变成小文件(因为最低写入频率为 15 分钟)。因此,如有必要,请编写一个每天运行一次的压缩作业。类似的东西
df.repartition(5).write.format("avro").save(targetpath)
会做这项工作。
那么现在读取捕获数据的最佳做法是什么?
5.忽略读取数据的非 avro 文件
Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳做法是仅使用 avro-extension 读取数据。您可以通过 spark 配置轻松实现此目的:
spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")
6.只读相关分区
考虑只读取相关的分区,例如。 G。过滤当前摄取日期。
7.使用共享元数据
读取捕获的数据与直接从 Azure EventHubs 读取数据类似。 所以你必须有一个模式。假设您还有使用 Spark 结构化流直接读取数据的作业,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:
["MeasurementTS":"timestamp","Location":"string", "Temperature":"double"]
并用这个simple parsing function阅读它:
# parse the metadata to get the schema
from collections import OrderedDict
from pyspark.sql.types import *
import json
ds = dbutils.fs.head (metadata) # read metadata file
items = (json
.JSONDecoder(object_pairs_hook=OrderedDict)
.decode(ds)[0].items())
#Schema mapping
mapping = "string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType
schema = StructType([
StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])
所以你可以重用你的架构:
from pyspark.sql.functions import *
parsedData = spark.read.format("avro").load(rawpath). \
selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
.select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
.select("EnqueuedTimeUtc", "data.*")
【讨论】:
【参考方案2】:确保输入数据是“.avro”文件。
由于 spark-avro 模块是外部的,因此 DataFrameReader 或 DataFrameWriter 中没有 .avro API。
要以 Avro 格式加载/保存数据,您需要将数据源选项格式指定为 avro(或 org.apache.spark.sql.avro)。
例子:
Python
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
或
#storage->avro
avroDf = spark.read.format("com.databricks.spark.avro").load(in_path)
更多详情,请参考以下链接:
https://spark.apache.org/docs/latest/sql-data-sources-avro.html
http://blog.itaysk.com/2017/01/14/processing-event-hub-capture-files-using-spark
https://medium.com/@caiomsouza/processing-event-hubs-capture-files-avro-format-using-spark-azure-databricks-save-to-parquet-95259001d85f
希望这会有所帮助。
【讨论】:
以上是关于使用 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 中的 Databricks 读取 avro 数据失败的主要内容,如果未能解决你的问题,请参考以下文章
使用 .Net 核心应用程序发送到 EventHubs 时如何优化吞吐量
带有 IoT 中心的 Azure Functions 无法检索分区