如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)

Posted

技术标签:

【中文标题】如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)【英文标题】:How can I add several columns (still not populated) to the DataFrame in Spark Structured Streaming 【发布时间】:2021-12-28 20:09:05 【问题描述】:

我有一个带有标准 Kafka 架构的 Kafka 流。我想添加一堆列以使该流可以合并。我想重用架构变量

val schema = StructType(
    StructField("id", LongType, nullable = false) ::
      StructField("Energy Data", StringType, nullable = false) ::
      StructField("Distance", StringType, nullable = false) ::
      StructField("Humidity", StringType, nullable = false) ::
      StructField("Ambient Temperature", StringType, nullable = false) ::
      StructField("Cold Water Temperature", StringType, nullable = false) ::
      StructField("Vibration Value 1", StringType, nullable = false) ::
      StructField("Vibration Value 2", StringType, nullable = false) ::
      StructField("Handle Movement", StringType, nullable = false) ::
      StructField("Make Coffee", StringType, nullable = false) ::
      Nil)

有没有类似的

.withColumns(schema)

不是复制结构,而是重用与要添加的列列表源相同的架构?

更新:

val iter=schema.iterator
    while(iter.hasNext)
      
        controlDataFrame=controlDataFrame.withColumn(iter.next.name,lit(""))
      

为我工作

【问题讨论】:

【参考方案1】:

也许您可以尝试以下方法:

xs.withColumn("y", lit(null).cast(StringType))

添加空列。 您可以从 xs.schema 获取架构,但如果您想重用原始变量,我不确定这是否能解决您的问题。

【讨论】:

val iter=schema.iterator while(iter.hasNext) controlDataFrame=controlDataFrame.withColumn(iter.next.name,lit("")) 为我工作

以上是关于如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

如何在 Spark Structured Streaming 中控制输出文件的大小

如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)

如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源

如何从 Spark Structured Streaming 刷新 Hive/Impala 表?

如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?