如何从 ADLS 将自定义数据框写入 eventthub
Posted
技术标签:
【中文标题】如何从 ADLS 将自定义数据框写入 eventthub【英文标题】:How to write custom dataframe to eventhub from ADLS 【发布时间】:2022-01-19 07:57:50 【问题描述】:我想将自定义数据框写入 eventthub。
val customDf = spark.read.json("path/to/json")
EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")
val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()
eventhubSchema.printSchema
将显示 eventthub body 的默认架构
现在我想将上面的 customDf 写入 eventthub
Method1:
ds = customDf \
.selectExpr("partitionKey", "body") \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
方法二:
ds = customDf \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "///output.txt") \
.start()
如何将 customDf 写入 eventthub。我什至做了 select(get_json_object(cast to strong type) 但我得到了
org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns
如何将customDf写入eventthub
【问题讨论】:
【参考方案1】:您需要将数据框中的数据转换为单列对象 - 二进制或字符串 - 这实际上取决于您的消费者。最简单的方法是将所有数据打包为 JSON,使用 to_json
+ struct
函数的组合:
import pyspark.sql.functions as F
stream = customDf \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream \
.format("eventhubs") \
.options(ehConf.toMap) \
.option("checkpointLocation", "...") \
.start()
【讨论】:
太棒了!这样可行!我的整个架构(嵌套结构)变成了一个列对象。谢谢!以上是关于如何从 ADLS 将自定义数据框写入 eventthub的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Azure databricks 通过 ADLS gen 2 中的多个工作表读取和写入 excel 数据
Spark scala Dataframe:如何将自定义类型应用于现有数据框?