如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)
Posted
技术标签:
【中文标题】如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)【英文标题】:How can I add several columns (still not populated) to the DataFrame in Spark Structured Streaming 【发布时间】:2021-12-28 20:09:05 【问题描述】:我有一个带有标准 Kafka 架构的 Kafka 流。我想添加一堆列以使该流可以合并。我想重用架构变量
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("Energy Data", StringType, nullable = false) ::
StructField("Distance", StringType, nullable = false) ::
StructField("Humidity", StringType, nullable = false) ::
StructField("Ambient Temperature", StringType, nullable = false) ::
StructField("Cold Water Temperature", StringType, nullable = false) ::
StructField("Vibration Value 1", StringType, nullable = false) ::
StructField("Vibration Value 2", StringType, nullable = false) ::
StructField("Handle Movement", StringType, nullable = false) ::
StructField("Make Coffee", StringType, nullable = false) ::
Nil)
有没有类似的
.withColumns(schema)
不是复制结构,而是重用与要添加的列列表源相同的架构?
更新:
val iter=schema.iterator
while(iter.hasNext)
controlDataFrame=controlDataFrame.withColumn(iter.next.name,lit(""))
为我工作
【问题讨论】:
【参考方案1】:也许您可以尝试以下方法:
xs.withColumn("y", lit(null).cast(StringType))
添加空列。
您可以从 xs.schema
获取架构,但如果您想重用原始变量,我不确定这是否能解决您的问题。
【讨论】:
val iter=schema.iterator while(iter.hasNext) controlDataFrame=controlDataFrame.withColumn(iter.next.name,lit("")) 为我工作以上是关于如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python 在 Spark Structured Streaming 中查看特定指标
如何在 Spark Structured Streaming 中控制输出文件的大小
如何在 Spark Structured Streaming 中向 DataFrame 添加几列(仍未填充)
如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源
如何从 Spark Structured Streaming 刷新 Hive/Impala 表?
如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?