如何从 ADLS 将自定义数据框写入 eventthub

Posted

技术标签:

【中文标题】如何从 ADLS 将自定义数据框写入 eventthub【英文标题】:How to write custom dataframe to eventhub from ADLS 【发布时间】:2022-01-19 07:57:50 【问题描述】:

我想将自定义数据框写入 eventthub。

val customDf = spark.read.json("path/to/json")

EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")

val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()

eventhubSchema.printSchema 

将显示 eventthub body 的默认架构

现在我想将上面的 customDf 写入 eventthub

Method1:
    ds = customDf \
      .selectExpr("partitionKey", "body") \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "///output.txt") \
      .start()

方法二:

ds = customDf \          
  .writeStream \
  .format("eventhubs") \
  .options(ehConf.toMap) \
  .option("checkpointLocation", "///output.txt") \
  .start()

如何将 customDf 写入 eventthub。我什至做了 select(get_json_object(cast to strong type) 但我得到了

org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns 

如何将customDf写入eventthub

【问题讨论】:

【参考方案1】:

您需要将数据框中的数据转换为单列对象 - 二进制或字符串 - 这实际上取决于您的消费者。最简单的方法是将所有数据打包为 JSON,使用 to_json + struct 函数的组合:

import pyspark.sql.functions as F

stream = customDf \
      .select(F.to_json(F.struct("*")).alias("body")) \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "...") \
      .start()

【讨论】:

太棒了!这样可行!我的整个架构(嵌套结构)变成了一个列对象。谢谢!

以上是关于如何从 ADLS 将自定义数据框写入 eventthub的主要内容,如果未能解决你的问题,请参考以下文章

如何将自定义函数应用于每行的熊猫数据框[重复]

如何使用 Azure databricks 通过 ADLS gen 2 中的多个工作表读取和写入 excel 数据

Spark scala Dataframe:如何将自定义类型应用于现有数据框?

从 databrick 在 adls gen 1 中写入 tsv 文件时行分隔符更改

单击提交按钮时将自定义文本从选择框提交到文本区域

如何将自定义格式(货币)应用于基于组合框选择的文本框?